[EAGLE-538] Add Mapreduce task level apis

https://issues.apache.org/jira/browse/EAGLE-538

https://issues.apache.org/jira/browse/EAGLE-554

Author: Qingwen Zhao <qingwen...@gmail.com>
Author: Zhao, Qingwen <qingwz...@ebay.com>

Closes #432 from qingwen220/dataSkew.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/cfd5c7f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/cfd5c7f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/cfd5c7f8

Branch: refs/heads/master
Commit: cfd5c7f826a2816bd5bf8db194932d2b3ec9ff4c
Parents: 020a5b3
Author: Qingwen Zhao <qingwen...@gmail.com>
Authored: Thu Sep 22 15:59:49 2016 +0800
Committer: Zhao, Qingwen <qingwz...@ebay.com>
Committed: Thu Sep 22 15:59:49 2016 +0800

----------------------------------------------------------------------
 .../eagle/service/app/TestServiceAppWithZk.java |   8 +-
 .../client/impl/EagleServiceBaseClient.java     |   2 +-
 .../client/impl/EagleServiceClientImpl.java     |   4 +
 .../eagle/service/jpm/MRJobCountHelper.java     | 140 -----------
 .../service/jpm/MRJobExecutionResource.java     | 133 +++-------
 .../service/jpm/MRJobTaskCountResponse.java     |  12 +-
 .../service/jpm/MRTaskExecutionResource.java    | 241 +++++++++++++++++++
 .../service/jpm/MRTaskExecutionResponse.java    |  85 +++++++
 .../apache/eagle/service/jpm/ResourceUtils.java |  87 +++++++
 .../service/jpm/TaskCountByDurationHelper.java  | 106 --------
 .../eagle/service/jpm/count/MRJobCountImpl.java | 146 +++++++++++
 .../service/jpm/count/MRTaskCountImpl.java      | 124 ++++++++++
 .../service/jpm/suggestion/AbstractGCFunc.java  | 104 ++++++++
 .../jpm/suggestion/AbstractInputFunc.java       |  86 +++++++
 .../eagle/service/jpm/suggestion/MapGCFunc.java |  38 +++
 .../service/jpm/suggestion/MapInputFunc.java    |  40 +++
 .../service/jpm/suggestion/MapSpillFunc.java    |  90 +++++++
 .../service/jpm/suggestion/ReduceGCFunc.java    |  38 +++
 .../service/jpm/suggestion/ReduceInputFunc.java |  39 +++
 .../service/jpm/suggestion/SuggestionFunc.java  |  27 +++
 .../jpm/TestJobCountPerBucketHelper.java        |  90 -------
 .../service/jpm/TestTaskCountPerJobHelper.java  |  96 --------
 .../service/jpm/count/TestMRJobCountImpl.java   |  92 +++++++
 .../service/jpm/count/TestMRTaskCountImpl.java  | 129 ++++++++++
 .../jpm/suggestion/TestDataSkewFunc.java        |  89 +++++++
 .../jpm/suggestion/TestTaskCounterFunc.java     |  88 +++++++
 .../spark/history/SparkHistoryJobAppConfig.java |   8 +-
 .../history/crawl/JHFSparkEventReader.java      |   1 +
 .../jpm/spark/history/crawl/JHFSparkParser.java |   7 +-
 .../status/JobHistoryZKStateManager.java        |   8 +-
 .../history/storm/SparkHistoryJobParseBolt.java |   4 +-
 .../history/storm/SparkHistoryJobSpout.java     |  17 +-
 ...spark.history.SparkHistoryJobAppProvider.xml |   6 -
 .../src/main/resources/application.conf         |   3 +-
 .../org/apache/eagle/jpm/util/Constants.java    |   4 +
 .../eagle/jpm/util/jobcounter/JobCounters.java  |  78 ++++++
 eagle-server/pom.xml                            |  11 +-
 37 files changed, 1701 insertions(+), 580 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java
index a19889b..f7f6c57 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java
@@ -31,18 +31,12 @@ import 
org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
 import org.apache.eagle.alert.utils.ZookeeperEmbedded;
 import org.apache.eagle.service.app.ServiceApp;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
 
 import com.google.common.base.Joiner;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-/**
- * @author xiancli
- */
 public class TestServiceAppWithZk {
 
     ZookeeperEmbedded zkEmbed;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
 
b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
index ea5d9a5..3b717d8 100644
--- 
a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
+++ 
b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
@@ -67,7 +67,7 @@ public abstract class EagleServiceBaseClient implements 
IEagleServiceClient {
 
     private final static Logger LOG = 
LoggerFactory.getLogger(EagleServiceBaseClient.class);
 
-    protected static final String DEFAULT_BASE_PATH = "/rest";
+    public static final String DEFAULT_BASE_PATH = "/rest";
     protected static final MediaType DEFAULT_MEDIA_TYPE = 
MediaType.APPLICATION_JSON_TYPE;
     protected static final String DEFAULT_HTTP_HEADER_CONTENT_TYPE = 
"application/json";
     protected static final String CONTENT_TYPE = "Content-Type";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
 
b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
index 6018469..912f1f7 100644
--- 
a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
+++ 
b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
@@ -45,6 +45,10 @@ public class EagleServiceClientImpl extends 
EagleServiceBaseClient {
         super(host, port, username, password);
     }
 
+    public EagleServiceClientImpl(String host, int port, String basePath, 
String username, String password){
+        super(host, port, basePath, username, password);
+    }
+
     private String getWholePath(String urlString){
        return getBaseEndpoint() + urlString;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
deleted file mode 100644
index 2fa5c04..0000000
--- 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.eagle.service.jpm;
-
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.MRJobTagName;
-import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse;
-import org.apache.eagle.service.jpm.MRJobTaskCountResponse.UnitJobCount;
-
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class MRJobCountHelper {
-
-    public void initJobCountList(List<UnitJobCount> jobCounts, long startTime, 
long endTime, long intervalInSecs) {
-        for (long i = startTime / intervalInSecs; i * intervalInSecs <= 
endTime; i++) {
-            jobCounts.add(new UnitJobCount(i * intervalInSecs * 
DateTimeUtil.ONESECOND));
-        }
-    }
-
-    public String moveTimeForwardOneDay(String startTime) throws 
ParseException {
-        long timeInSecs = DateTimeUtil.humanDateToSeconds(startTime);
-        timeInSecs -= 24L * 60L * 60L;
-        return DateTimeUtil.secondsToHumanDate(timeInSecs);
-    }
-
-    public JobCountResponse getRunningJobCount(List<JobExecutionAPIEntity> 
jobDurations,
-                                               
List<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity> runningJobs,
-                                        long startTimeInSecs,
-                                        long endTimeInSecs,
-                                        long intervalInSecs) {
-        List<UnitJobCount> jobCounts = new ArrayList<>();
-        Set<String> jobTypes = new HashSet<>();
-        initJobCountList(jobCounts, startTimeInSecs, endTimeInSecs, 
intervalInSecs);
-        for (JobExecutionAPIEntity jobDuration: jobDurations) {
-            String jobType = 
jobDuration.getTags().get(MRJobTagName.JOB_TYPE.toString());
-            jobTypes.add(jobType);
-            countJob(jobCounts, jobDuration.getStartTime() / 1000, 
jobDuration.getEndTime() / 1000, intervalInSecs, jobType);
-        }
-        for (org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity job : 
runningJobs) {
-            if (job.getInternalState() != null && 
!job.getInternalState().equalsIgnoreCase(Constants.JobState.FINISHED.toString()))
 {
-                String jobType = 
job.getTags().get(MRJobTagName.JOB_TYPE.toString());
-                jobTypes.add(jobType);
-                countJob(jobCounts, job.getStartTime() / 1000, endTimeInSecs, 
intervalInSecs, jobType);
-            }
-        }
-        JobCountResponse response = new JobCountResponse();
-        response.jobCounts = jobCounts;
-        response.jobTypes = jobTypes;
-        return response;
-    }
-
-    public JobCountResponse getHistoryJobCount(List<JobExecutionAPIEntity> 
jobDurations, String timeList) {
-        JobCountResponse response = new JobCountResponse();
-        List<UnitJobCount> jobCounts = new ArrayList<>();
-        Set<String> jobTypes = new HashSet<>();
-        List<Long> times = TaskCountByDurationHelper.parseTimeList(timeList);
-        for (int i = 0; i < times.size(); i++) {
-            jobCounts.add(new UnitJobCount(times.get(i)));
-        }
-        for (JobExecutionAPIEntity job : jobDurations) {
-            int jobIndex = TaskCountByDurationHelper.getPosition(times, 
job.getDurationTime());
-            UnitJobCount counter = jobCounts.get(jobIndex);
-            String jobType = 
job.getTags().get(MRJobTagName.JOB_TYPE.toString());
-            jobTypes.add(jobType);
-            countJob(counter, jobType);
-        }
-        response.jobCounts = jobCounts;
-        response.jobTypes = jobTypes;
-        return response;
-    }
-
-    public void countJob(UnitJobCount counter, String jobType) {
-        if (null  ==  jobType) {
-            jobType = "null";
-        }
-        counter.jobCount++;
-        if (counter.jobCountByType.containsKey(jobType)) {
-            counter.jobCountByType.put(jobType, 
counter.jobCountByType.get(jobType) + 1);
-        } else {
-            counter.jobCountByType.put(jobType, 1L);
-        }
-    }
-
-    public void countJob(List<UnitJobCount> jobCounts, long jobStartTimeSecs, 
long jobEndTimeSecs, long intervalInSecs, String jobType) {
-        long startCountPoint = jobCounts.get(0).timeBucket / 
DateTimeUtil.ONESECOND;
-        if (jobEndTimeSecs < startCountPoint) {
-            return;
-        }
-        int startIndex = 0;
-        if (jobStartTimeSecs > startCountPoint) {
-            long relativeStartTime = jobStartTimeSecs - startCountPoint;
-            startIndex = (int) (relativeStartTime / intervalInSecs) + 
(relativeStartTime % intervalInSecs == 0 ? 0 : 1);
-        }
-        long relativeEndTime = jobEndTimeSecs - startCountPoint;
-        int endIndex = (int) (relativeEndTime / intervalInSecs);
-
-        for (int i = startIndex; i <= endIndex && i < jobCounts.size(); i++) {
-            countJob(jobCounts.get(i), jobType);
-        }
-    }
-
-    public List<String> getSearchTimeDuration(List<JobExecutionAPIEntity> 
jobEntities) {
-        List<String> pair = new ArrayList<>();
-        long minStartTime = System.currentTimeMillis();
-        long maxEndTime = 0;
-        for (JobExecutionAPIEntity jobEntity : jobEntities) {
-            if (minStartTime > jobEntity.getStartTime()) {
-                minStartTime = jobEntity.getStartTime();
-            }
-            if (maxEndTime < jobEntity.getEndTime()) {
-                maxEndTime = jobEntity.getEndTime();
-            }
-        }
-        
pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(minStartTime));
-        pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(maxEndTime));
-        return pair;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
index e6041f2..204412b 100644
--- 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
@@ -18,21 +18,18 @@
 
 package org.apache.eagle.service.jpm;
 
-import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID;
-import static org.apache.eagle.jpm.util.MRJobTagName.TASK_TYPE;
-
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
-import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.generic.GenericEntityServiceResource;
 import org.apache.eagle.service.generic.ListQueryResource;
 import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse;
-import 
org.apache.eagle.service.jpm.MRJobTaskCountResponse.TaskCountPerJobResponse;
 
 import org.apache.commons.lang.time.StopWatch;
+import org.apache.eagle.service.jpm.count.MRJobCountImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,11 +40,14 @@ import javax.ws.rs.core.MediaType;
 
 @Path("mrJobs")
 public class MRJobExecutionResource {
-    GenericEntityServiceResource resource = new GenericEntityServiceResource();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MRJobExecutionResource.class);
+
     public static final String ELAPSEDMS = "elapsedms";
     public static final String TOTAL_RESULTS = "totalResults";
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MRJobExecutionResource.class);
+    private final MRJobCountImpl helper = new MRJobCountImpl();
+    private GenericEntityServiceResource resource = new 
GenericEntityServiceResource();
 
     @GET
     @Produces(MediaType.APPLICATION_JSON)
@@ -76,14 +76,15 @@ public class MRJobExecutionResource {
         if (res.isSuccess() && res.getObj() != null) {
             for (TaggedLogAPIEntity o : res.getObj()) {
                 finishedJobs.add(o);
-                jobIds.add(o.getTags().get(JOB_ID.toString()));
+                jobIds.add(o.getTags().get(MRJobTagName.JOB_ID.toString()));
             }
             jobQuery = String.format(query, 
Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME);
             res = resource.search(jobQuery, startTime, endTime, pageSize, 
startRowkey, treeAgg, timeSeries, intervalmin,
                 top, filterIfMissing, parallel, metricName, verbose);
             if (res.isSuccess() && res.getObj() != null) {
                 for (TaggedLogAPIEntity o : res.getObj()) {
-                    if (!isDuplicate(jobIds, o)) {
+                    String key = 
o.getTags().get(MRJobTagName.JOB_ID.toString());
+                    if (!ResourceUtils.isDuplicate(jobIds, key)) {
                         jobs.add(o);
                     }
                 }
@@ -105,13 +106,6 @@ public class MRJobExecutionResource {
 
     }
 
-    private boolean isDuplicate(Set<String> keys, TaggedLogAPIEntity o) {
-        if (keys.isEmpty()) {
-            return false;
-        }
-        return keys.contains(o.getTags().get(JOB_ID.toString()));
-    }
-
     private String buildCondition(String jobId, String jobDefId, String site) {
         String conditionFormat = "@site=\"%s\"";
         String condition = null;
@@ -142,7 +136,7 @@ public class MRJobExecutionResource {
         List<TaggedLogAPIEntity> jobs = new ArrayList<>();
         Set<String> jobIds = new HashSet<>();
         String condition = buildCondition(jobId, jobDefId, site);
-        final int pageSize = Integer.MAX_VALUE;
+
         if (condition == null) {
             response.setException(new Exception("Search condition is empty"));
             response.setSuccess(false);
@@ -155,18 +149,19 @@ public class MRJobExecutionResource {
         stopWatch.start();
         String queryFormat = "%s[%s]{*}";
         String queryString = String.format(queryFormat, 
Constants.JPA_JOB_EXECUTION_SERVICE_NAME, condition);
-        GenericServiceAPIResponseEntity<TaggedLogAPIEntity> res = 
resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, 
true, 0, null, false);
+        GenericServiceAPIResponseEntity<TaggedLogAPIEntity> res = 
ResourceUtils.getQueryResult(queryString, null, null);
         if (res.isSuccess() && res.getObj() != null) {
             for (TaggedLogAPIEntity o : res.getObj()) {
                 jobs.add(o);
-                jobIds.add(o.getTags().get(JOB_ID.toString()));
+                jobIds.add(o.getTags().get(MRJobTagName.JOB_ID.toString()));
             }
         }
         queryString = String.format(queryFormat, 
Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, condition);
-        res = resource.search(queryString, null, null, pageSize, null, false, 
true, 0L, 0, true, 0, null, false);
+        res = ResourceUtils.getQueryResult(queryString, null, null);
         if (res.isSuccess() && res.getObj() != null) {
             for (TaggedLogAPIEntity o : res.getObj()) {
-                if (!isDuplicate(jobIds, o)) {
+                String key = o.getTags().get(MRJobTagName.JOB_ID.toString());
+                if (!ResourceUtils.isDuplicate(jobIds, key)) {
                     jobs.add(o);
                 }
             }
@@ -193,67 +188,6 @@ public class MRJobExecutionResource {
         return response;
     }
 
-
-
-
-    @GET
-    @Path("{jobId}/taskCountsByDuration")
-    @Produces(MediaType.APPLICATION_JSON)
-    public TaskCountPerJobResponse getTaskCountsPerJob(@PathParam("jobId") 
String jobId,
-                                                       @QueryParam("site") 
String site,
-                                                       
@QueryParam("timelineInSecs") String timeList,
-                                                       @QueryParam("top") long 
top) {
-        TaskCountPerJobResponse response = new TaskCountPerJobResponse();
-        if (jobId == null || site == null || timeList == null || 
timeList.isEmpty()) {
-            response.errMessage = "IllegalArgumentException: jobId == null || 
site == null || timelineInSecs == null or isEmpty";
-            return response;
-        }
-        TaskCountByDurationHelper helper = new TaskCountByDurationHelper();
-        List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount = new 
ArrayList<>();
-        List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount = new 
ArrayList<>();
-
-        List<Long> times = helper.parseTimeList(timeList);
-        String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", 
Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId);
-        
GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity>
 historyRes =
-            resource.search(query, null, null, Integer.MAX_VALUE, null, false, 
true, 0L, 0, true, 0, null, false);
-        if (historyRes.isSuccess() && historyRes.getObj() != null && 
historyRes.getObj().size() > 0) {
-            helper.initTaskCountList(runningTaskCount, finishedTaskCount, 
times, new TaskCountByDurationHelper.HistoryTaskComparator());
-            for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity 
o : historyRes.getObj()) {
-                int index = helper.getPosition(times, o.getDuration());
-                MRJobTaskCountResponse.UnitTaskCount counter = 
finishedTaskCount.get(index);
-                helper.countTask(counter, 
o.getTags().get(TASK_TYPE.toString()));
-                counter.entities.add(o);
-            }
-        } else {
-            query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", 
Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME, site, jobId);
-            GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> runningRes 
=
-                resource.search(query, null, null, Integer.MAX_VALUE, null, 
false, true, 0L, 0, true, 0, null, false);
-            if (runningRes.isSuccess() && runningRes.getObj() != null) {
-                helper.initTaskCountList(runningTaskCount, finishedTaskCount, 
times, new TaskCountByDurationHelper.RunningTaskComparator());
-                for (TaskExecutionAPIEntity o : runningRes.getObj()) {
-                    int index = helper.getPosition(times, o.getDuration());
-                    if 
(o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
-                        MRJobTaskCountResponse.UnitTaskCount counter = 
runningTaskCount.get(index);
-                        helper.countTask(counter, 
o.getTags().get(TASK_TYPE.toString()));
-                        counter.entities.add(o);
-                    } else if (o.getEndTime() != 0) {
-                        MRJobTaskCountResponse.UnitTaskCount counter = 
finishedTaskCount.get(index);
-                        helper.countTask(counter, 
o.getTags().get(TASK_TYPE.toString()));
-                        counter.entities.add(o);
-                    }
-                }
-            }
-        }
-        if (top > 0) {
-            helper.getTopTasks(runningTaskCount, top);
-            response.runningTaskCount = runningTaskCount;
-            helper.getTopTasks(finishedTaskCount, top);
-            response.finishedTaskCount = finishedTaskCount;
-        }
-        response.topNumber = top;
-        return response;
-    }
-
     @GET
     @Path("runningJobCounts")
     @Produces(MediaType.APPLICATION_JSON)
@@ -262,7 +196,6 @@ public class MRJobExecutionResource {
                                                @QueryParam("durationEnd") 
String endTime,
                                                @QueryParam("intervalInSecs") 
long intervalInSecs) {
         JobCountResponse response = new JobCountResponse();
-        MRJobCountHelper helper = new MRJobCountHelper();
         if (site == null || startTime == null || endTime == null) {
             response.errMessage = "IllegalArgument: site, durationBegin, 
durationEnd is null";
             return response;
@@ -271,26 +204,22 @@ public class MRJobExecutionResource {
             response.errMessage = String.format("IllegalArgument: 
intervalInSecs=%s is invalid", intervalInSecs);
             return response;
         }
-        long startTimeInMills;
         String searchStartTime = startTime;
         String searchEndTime = endTime;
         try {
-            startTimeInMills = DateTimeUtil.humanDateToSeconds(startTime) * 
DateTimeUtil.ONESECOND;
             searchStartTime = helper.moveTimeForwardOneDay(searchStartTime);
         } catch (Exception e) {
             response.errMessage = e.getMessage();
             return response;
         }
-        String query = String.format("%s[@site=\"%s\" AND 
@endTime>=%s]{@startTime,@endTime,@jobType}", 
Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, startTimeInMills);
-        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
-            resource.search(query, searchStartTime, searchEndTime, 
Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+        String query = 
String.format("%s[@site=\"%s\"]{@startTime,@endTime,@jobType,@jobId}", 
Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site);
+        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes = 
ResourceUtils.getQueryResult(query, searchStartTime, searchEndTime);
         if (!historyRes.isSuccess() || historyRes.getObj() == null) {
             response.errMessage = String.format("Catch an exception during 
fetch history jobs: %s with query=%s", historyRes.getException(), query);
             return response;
         }
-        query = 
String.format("%s[@site=\"%s\"]{@startTime,@endTime,@jobType}", 
Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site);
-        
GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity>
 runningRes =
-            resource.search(query, searchStartTime, searchEndTime, 
Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+        query = 
String.format("%s[@site=\"%s\"]{@startTime,@endTime,@jobType,@jobId}", 
Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site);
+        
GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity>
 runningRes = ResourceUtils.getQueryResult(query, searchStartTime, 
searchEndTime);
         if (!runningRes.isSuccess() || runningRes.getObj() == null) {
             response.errMessage = String.format("Catch an exception during 
fetch running jobs: %s with query=%s", runningRes.getException(), query);
             return response;
@@ -332,7 +261,6 @@ public class MRJobExecutionResource {
     public Object getJobMetrics(String site, String timePoint, String 
metricName, long intervalmin, int top,
                                 Function6<String, String, String, Long, 
Integer, String, Object> metricQueryFunc) {
         GenericServiceAPIResponseEntity response = new 
GenericServiceAPIResponseEntity();
-        MRJobCountHelper helper = new MRJobCountHelper();
         if (site == null || timePoint == null || metricName == null) {
             response.setException(new IllegalArgumentException("Error: site, 
timePoint, metricName may be unset"));
             response.setSuccess(false);
@@ -360,8 +288,7 @@ public class MRJobExecutionResource {
         }
         String query = String.format("%s[@site=\"%s\" AND @startTime<=\"%s\" 
AND @endTime>=\"%s\"]{@startTime,@endTime}",
             Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, timePointsInMills, 
timePointsInMills);
-        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
-            resource.search(query, searchStartTime, searchEndTime, 
Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes = 
ResourceUtils.getQueryResult(query, searchStartTime, searchEndTime);
         if (!historyRes.isSuccess() || historyRes.getObj() == null) {
             return historyRes;
         }
@@ -372,14 +299,14 @@ public class MRJobExecutionResource {
         return metricQueryFunc.apply(query, timeDuration.get(0), 
timeDuration.get(1), intervalmin, top, metricName);
     }
 
-    Function6<String, String, String, Long, Integer, String, Object> 
queryMetricEntitiesFunc
+    private Function6<String, String, String, Long, Integer, String, Object> 
queryMetricEntitiesFunc
         = (query, startTime, endTime, intervalmin, top, metricName) -> {
             GenericEntityServiceResource resource = new 
GenericEntityServiceResource();
             return resource.search(query, startTime, endTime, 
Integer.MAX_VALUE, null,
             false, true, intervalmin, top, true, 0, metricName, false);
         };
 
-    Function6<String, String, String, Long, Integer, String, Object> 
queryMetricListFunc
+    private Function6<String, String, String, Long, Integer, String, Object> 
queryMetricListFunc
         = (query, startTime, endTime, intervalmin, top, metricName) -> {
             ListQueryResource resource = new ListQueryResource();
             return resource.listQuery(query, startTime, endTime, 
Integer.MAX_VALUE, null,
@@ -395,24 +322,22 @@ public class MRJobExecutionResource {
     @Path("jobCountsByDuration")
     @Produces(MediaType.APPLICATION_JSON)
     public JobCountResponse getJobCountGroupByDuration(@QueryParam("site") 
String site,
-                                                       
@QueryParam("timelineInSecs") String timeList,
-                                                       
@QueryParam("jobStartTimeBegin") String startTime,
-                                                       
@QueryParam("jobStartTimeEnd") String endTime) {
+                                                       
@QueryParam("timeDistInSecs") String timeList,
+                                                       
@QueryParam("startTime") String startTime,
+                                                       @QueryParam("endTime") 
String endTime) {
         JobCountResponse response = new JobCountResponse();
-        MRJobCountHelper helper = new MRJobCountHelper();
         if (site == null || startTime == null || endTime == null || timeList 
== null) {
-            response.errMessage = "IllegalArgument: site, jobStartTimeBegin, 
jobStartTimeEnd, or timelineInSecs is null";
+            response.errMessage = "IllegalArgument: site, startTime, endTime, 
or timeDistInSecs is null";
             return response;
         }
         String query = 
String.format("%s[@site=\"%s\"]{@durationTime,@jobType}", 
Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site);
-        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
-            resource.search(query, startTime, endTime, Integer.MAX_VALUE, 
null, false, true, 0L, 0, true, 0, null, false);
+        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes = 
ResourceUtils.getQueryResult(query, startTime, endTime);
         if (!historyRes.isSuccess() || historyRes.getObj() == null) {
             response.errMessage = String.format("Catch an exception: %s with 
query=%s", historyRes.getException(), query);
             return response;
         }
         try {
-            return helper.getHistoryJobCount(historyRes.getObj(), timeList);
+            return 
helper.getHistoryJobCountGroupByDuration(historyRes.getObj(), timeList);
         } catch (Exception e) {
             response.errMessage = e.getMessage();
             return response;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
index 170533c..45ffbd7 100644
--- 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
@@ -29,12 +29,16 @@ public class MRJobTaskCountResponse {
         public List<UnitTaskCount> finishedTaskCount;
     }
 
+    public static class HistoryTaskCountResponse extends 
MRJobTaskCountResponse {
+        public List<UnitTaskCount> taskCount;
+    }
+
     public static class JobCountResponse extends MRJobTaskCountResponse {
         public Set<String> jobTypes;
         public List<UnitJobCount> jobCounts;
     }
 
-    static class UnitTaskCount {
+    public static class UnitTaskCount {
         public long timeBucket;
         public int taskCount;
         public int mapTaskCount;
@@ -42,7 +46,7 @@ public class MRJobTaskCountResponse {
         public Set entities;
         public List topEntities;
 
-        UnitTaskCount(long timeBucket, Comparator comparator) {
+        public UnitTaskCount(long timeBucket, Comparator comparator) {
             this.timeBucket = timeBucket;
             this.taskCount = 0;
             this.mapTaskCount = 0;
@@ -52,12 +56,12 @@ public class MRJobTaskCountResponse {
         }
     }
 
-    static class UnitJobCount {
+    public static class UnitJobCount {
         public long timeBucket;
         public long jobCount;
         public Map<String, Long> jobCountByType;
 
-        UnitJobCount(long timeBucket) {
+        public UnitJobCount(long timeBucket) {
             this.timeBucket = timeBucket;
             this.jobCount = 0;
             this.jobCountByType = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResource.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResource.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResource.java
new file mode 100644
index 0000000..1125387
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResource.java
@@ -0,0 +1,241 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+
+import org.apache.eagle.service.jpm.count.MRTaskCountImpl;
+import org.apache.eagle.service.jpm.suggestion.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.eagle.jpm.util.MRJobTagName.TASK_TYPE;
+
+@Path("mrTasks")
+public class MRTaskExecutionResource {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MRTaskExecutionResource.class);
+    MRTaskCountImpl taskCountImpl = new MRTaskCountImpl();
+
+    @GET
+    @Path("taskCountsByDuration")
+    @Produces(MediaType.APPLICATION_JSON)
+    public MRJobTaskCountResponse.TaskCountPerJobResponse 
getTaskCountsGroupByDuration(@QueryParam("site") String site,
+                                                                               
        @QueryParam("jobId") String jobId,
+                                                                               
        @QueryParam("jobStartTime") String jobStartTime,
+                                                                               
        @QueryParam("jobEndTime") String jobEndTime,
+                                                                               
        @QueryParam("timeDistInSecs") String timeDistInSecs,
+                                                                               
        @QueryParam("top") long top) {
+        MRJobTaskCountResponse.TaskCountPerJobResponse response = new 
MRJobTaskCountResponse.TaskCountPerJobResponse();
+        if (jobId == null || site == null || timeDistInSecs == null || 
timeDistInSecs.isEmpty()) {
+            response.errMessage = "IllegalArgumentException: jobId == null || 
site == null || timeDistInSecs == null or isEmpty";
+            return response;
+        }
+        List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount = new 
ArrayList<>();
+        List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount = new 
ArrayList<>();
+
+        List<Long> times = ResourceUtils.parseDistributionList(timeDistInSecs);
+        String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", 
Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId);
+        
GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity>
 historyRes =
+            ResourceUtils.getQueryResult(query, jobStartTime, jobEndTime);
+        if (historyRes.isSuccess() && historyRes.getObj() != null && 
historyRes.getObj().size() > 0) {
+            taskCountImpl.initTaskCountList(runningTaskCount, 
finishedTaskCount, times, new MRTaskCountImpl.HistoryTaskComparator());
+            for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity 
o : historyRes.getObj()) {
+                int index = ResourceUtils.getDistributionPosition(times, 
o.getDuration() / DateTimeUtil.ONESECOND);
+                MRJobTaskCountResponse.UnitTaskCount counter = 
finishedTaskCount.get(index);
+                taskCountImpl.countTask(counter, 
o.getTags().get(TASK_TYPE.toString()));
+                counter.entities.add(o);
+            }
+        } else {
+            query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", 
Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME, site, jobId);
+            
GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity>
 runningRes =
+                ResourceUtils.getQueryResult(query, jobStartTime, jobEndTime);
+            if (runningRes.isSuccess() && runningRes.getObj() != null) {
+                taskCountImpl.initTaskCountList(runningTaskCount, 
finishedTaskCount, times, new MRTaskCountImpl.RunningTaskComparator());
+                for 
(org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity o : 
runningRes.getObj()) {
+                    int index = ResourceUtils.getDistributionPosition(times, 
o.getDuration() / DateTimeUtil.ONESECOND);
+                    if 
(o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
+                        MRJobTaskCountResponse.UnitTaskCount counter = 
runningTaskCount.get(index);
+                        taskCountImpl.countTask(counter, 
o.getTags().get(TASK_TYPE.toString()));
+                        counter.entities.add(o);
+                    } else if (o.getEndTime() != 0) {
+                        MRJobTaskCountResponse.UnitTaskCount counter = 
finishedTaskCount.get(index);
+                        taskCountImpl.countTask(counter, 
o.getTags().get(TASK_TYPE.toString()));
+                        counter.entities.add(o);
+                    }
+                }
+            }
+        }
+        if (top > 0) {
+            taskCountImpl.getTopTasks(runningTaskCount, top);
+            response.runningTaskCount = runningTaskCount;
+            taskCountImpl.getTopTasks(finishedTaskCount, top);
+            response.finishedTaskCount = finishedTaskCount;
+        }
+        response.topNumber = top;
+        return response;
+    }
+
+    private MRTaskExecutionResponse.TaskGroupResponse 
getTaskGroups(@QueryParam("site") String site,
+                                                                   
@QueryParam("shortJob_id") String shortDurationJobId,
+                                                                   
@QueryParam("longJob_id") String longDurationJobId) {
+        MRTaskExecutionResponse.TaskGroupResponse result = new 
MRTaskExecutionResponse.TaskGroupResponse();
+        String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", 
Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, shortDurationJobId);
+        GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> smallResponse 
= ResourceUtils.getQueryResult(query, null, null);
+        if (!smallResponse.isSuccess() || smallResponse.getObj() == null) {
+            result.errMessage = smallResponse.getException();
+            return result;
+        }
+        long longestDuration = 0;
+        for (TaskExecutionAPIEntity entity : smallResponse.getObj()) {
+            if (entity.getDuration() > longestDuration) {
+                longestDuration = entity.getDuration();
+            }
+        }
+        query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", 
Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, longDurationJobId);
+        GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> largeResponse 
= ResourceUtils.getQueryResult(query, null, null);
+        if (!largeResponse.isSuccess() || largeResponse.getObj() == null) {
+            result.errMessage = largeResponse.getException();
+            return result;
+        }
+        result.tasksGroupByType = new HashMap<>();
+        result.tasksGroupByType.put(Constants.TaskType.MAP.toString(), new 
MRTaskExecutionResponse.TaskGroup());
+        result.tasksGroupByType.put(Constants.TaskType.REDUCE.toString(), new 
MRTaskExecutionResponse.TaskGroup());
+        groupTasksByValue(result, false, largeResponse.getObj(), 
longestDuration);
+        groupTasksByValue(result, true, smallResponse.getObj(), 
longestDuration);
+
+        return result;
+    }
+
+    public MRTaskExecutionResponse.TaskGroupResponse 
groupTasksByValue(MRTaskExecutionResponse.TaskGroupResponse result, boolean 
keepShort, List<TaskExecutionAPIEntity> tasks, long value) {
+        for (TaskExecutionAPIEntity entity : tasks) {
+            String taskType = 
entity.getTags().get(MRJobTagName.TASK_TYPE.toString());
+            MRTaskExecutionResponse.TaskGroup taskGroup = 
result.tasksGroupByType.get(taskType.toUpperCase());
+            if (entity.getDuration() <= value && keepShort) {
+                taskGroup.shortTasks.add(entity);
+            }
+            if (entity.getDuration() > value) {
+                taskGroup.longTasks.add(entity);
+            }
+        }
+        return result;
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("taskSuggestion")
+    public List<MRTaskExecutionResponse.JobSuggestionResponse> 
getSuggestion(@QueryParam("site") String site,
+                                                                             
@QueryParam("shortJob_id") String shortDurationJobId,
+                                                                             
@QueryParam("longJob_id") String longDurationJobId,
+                                                                             
@QueryParam("mapInputThreshold") long mapInputThreshold,
+                                                                             
@QueryParam("reduceInputThreshold") long reduceInputThreshold,
+                                                                             
@QueryParam("mapGcThreshold") long mapGcThreshold,
+                                                                             
@QueryParam("reduceGcThreshold") long reduceGcThreshold,
+                                                                             
@QueryParam("mapSpillThreshold") long mapSpillThreshold) {
+        List<MRTaskExecutionResponse.JobSuggestionResponse> result = new 
ArrayList<>();
+        MRTaskExecutionResponse.TaskGroupResponse taskGroups = 
getTaskGroups(site, shortDurationJobId, longDurationJobId);
+        if (taskGroups.errMessage != null) {
+            LOG.error(taskGroups.errMessage);
+            return result;
+        }
+        List<SuggestionFunc> suggestionFuncs = new ArrayList<>();
+        suggestionFuncs.add(new MapInputFunc(mapInputThreshold));
+        suggestionFuncs.add(new ReduceInputFunc(reduceInputThreshold));
+        suggestionFuncs.add(new MapGCFunc(mapGcThreshold));
+        suggestionFuncs.add(new ReduceGCFunc(reduceGcThreshold));
+        suggestionFuncs.add(new MapSpillFunc(mapSpillThreshold));
+        try {
+            for (SuggestionFunc func : suggestionFuncs) {
+                result.add(func.apply(taskGroups));
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            return result;
+        }
+        return  result;
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("historyTaskCount")
+    public MRJobTaskCountResponse.HistoryTaskCountResponse 
getTaskCountInMinute(@QueryParam("site") String site,
+                                                                               
    @QueryParam("jobId") String jobId,
+                                                                               
    @QueryParam("jobStartTime") String jobStartTime,
+                                                                               
    @QueryParam("jobEndTime") String jobEndTime) {
+        MRJobTaskCountResponse.HistoryTaskCountResponse result = new 
MRJobTaskCountResponse.HistoryTaskCountResponse();
+        if (jobId == null || site == null || jobStartTime == null || 
jobEndTime == null) {
+            result.errMessage = "IllegalArgumentException: jobId, or site, or 
jobStartTime, or jobEndTime is null";
+            return result;
+        }
+
+        String query = String.format("%s[@site=\"%s\" AND 
@jobId=\"%s\"]{@startTime,@endTime,@taskType}", 
Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId);
+        GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> response = 
ResourceUtils.getQueryResult(query, jobStartTime, jobEndTime);
+        if (!response.isSuccess() || response.getObj() == null) {
+            result.errMessage = response.getException();
+            return result;
+        }
+        try {
+            long startTimeInMin = 
DateTimeUtil.humanDateToSeconds(jobStartTime) / 60;
+            long endTimeInMin = DateTimeUtil.humanDateToSeconds(jobEndTime) / 
60;
+            return taskCountImpl.countHistoryTask(response.getObj(), 
startTimeInMin, endTimeInMin);
+        } catch (Exception e) {
+            e.printStackTrace();
+            result.errMessage = e.getMessage();
+            return result;
+        }
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("taskDistribution/{counterName}")
+    public MRTaskExecutionResponse.TaskDistributionResponse 
getTaskDistributionByCounterName(@QueryParam("site") String site,
+                                                                              
@QueryParam("jobId") String jobId,
+                                                                              
@QueryParam("jobStartTime") String jobStartTime,
+                                                                              
@QueryParam("jobEndTime") String jobEndTime,
+                                                                              
@QueryParam("taskType") String taskType,
+                                                                              
@PathParam("counterName") String counterName,
+                                                                              
@QueryParam("distRange") String distRange) {
+        MRTaskExecutionResponse.TaskDistributionResponse result = new 
MRTaskExecutionResponse.TaskDistributionResponse();
+        String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\" AND 
@taskType=\"%s\"]{@jobCounters}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME,
+            site, jobId, Constants.TaskType.MAP.toString());
+        GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> response = 
ResourceUtils.getQueryResult(query, jobStartTime, jobEndTime);
+        if (!response.isSuccess() || response.getObj() == null) {
+            result.errMessage = response.getException();
+            return result;
+        }
+        try {
+            return taskCountImpl.getHistoryTaskDistribution(response.getObj(), 
counterName, distRange);
+        } catch (Exception e) {
+            e.printStackTrace();
+            result.errMessage = e.getMessage();
+            return result;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResponse.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResponse.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResponse.java
new file mode 100644
index 0000000..c7cc258
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRTaskExecutionResponse.java
@@ -0,0 +1,85 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MRTaskExecutionResponse {
+    public String errMessage;
+
+    public static class TaskGroupResponse extends MRTaskExecutionResponse {
+        public Map<String, TaskGroup> tasksGroupByType;
+    }
+
+    public static class TaskGroup {
+        public List<TaskExecutionAPIEntity> longTasks;
+        public List<TaskExecutionAPIEntity> shortTasks;
+
+        public TaskGroup() {
+            longTasks = new ArrayList<>();
+            shortTasks = new ArrayList<>();
+        }
+    }
+
+    public static class JobSuggestionResponse extends MRTaskExecutionResponse {
+        public String suggestionType;
+        public List<SuggestionResult> suggestionResults;
+    }
+
+    public static class SuggestionResult {
+        public String name;
+        public double value;
+        public String suggestion;
+
+        public SuggestionResult(String name, double value, String suggestion) {
+            this.name = name;
+            this.value = value;
+            this.suggestion = suggestion;
+        }
+
+        public SuggestionResult(String name, double value) {
+            this.value = value;
+            this.name = name;
+        }
+    }
+
+    public static class TaskDistributionResponse extends 
MRTaskExecutionResponse {
+        public String counterName;
+        public List<CountUnit> taskBuckets;
+
+        public TaskDistributionResponse() {
+            taskBuckets = new ArrayList<>();
+        }
+    }
+
+    public static class CountUnit {
+        public long bucket;
+        public long countVal;
+
+        public CountUnit(long bucket) {
+            this.bucket = bucket;
+            this.countVal = 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/ResourceUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/ResourceUtils.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/ResourceUtils.java
new file mode 100644
index 0000000..381c7ec
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/ResourceUtils.java
@@ -0,0 +1,87 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.generic.GenericEntityServiceResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class ResourceUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResourceUtils.class);
+
+    static GenericEntityServiceResource resource = new 
GenericEntityServiceResource();
+
+    public static GenericServiceAPIResponseEntity getQueryResult(String query, 
String startTime, String endTime) {
+        return resource.search(query, startTime, endTime, Integer.MAX_VALUE, 
null, false, true, 0L, 0, true, 0, null, false);
+    }
+
+    public static double[] getCounterValues(List<TaskExecutionAPIEntity> 
tasks, JobCounters.CounterName counterName) {
+        List<Double> values = new ArrayList<>();
+        for (TaskExecutionAPIEntity task : tasks) {
+            
values.add(Double.valueOf(task.getJobCounters().getCounterValue(counterName)));
+        }
+        return toArray(values);
+    }
+
+    public static double[] toArray(List<Double> input) {
+        double[] result = new double[input.size()];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = input.get(i);
+        }
+        return result;
+    }
+
+    public static List<Long> parseDistributionList(String timelist) {
+        List<Long> times = new ArrayList<>();
+        String [] strs = timelist.split("[,\\s]");
+        for (String str : strs) {
+            try {
+                times.add(Long.parseLong(str));
+            } catch (Exception ex) {
+                LOG.warn(str + " is not a number");
+            }
+        }
+        return times;
+    }
+
+    public static int getDistributionPosition(List<Long> rangeList, Long 
value) {
+        for (int i = 1; i < rangeList.size(); i++) {
+            if (value < rangeList.get(i)) {
+                return i - 1;
+            }
+        }
+        return rangeList.size() - 1;
+    }
+
+    public static boolean isDuplicate(Set<String> keys, String jobId) {
+        if (keys.isEmpty()) {
+            return false;
+        }
+        return keys.contains(jobId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java
deleted file mode 100644
index 0eeb440..0000000
--- 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.eagle.service.jpm;
-
-import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
-import org.apache.eagle.jpm.util.Constants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-
-public class TaskCountByDurationHelper {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TaskCountByDurationHelper.class);
-
-    public static List<Long> parseTimeList(String timelist) {
-        List<Long> times = new ArrayList<>();
-        String [] strs = timelist.split("[,\\s]");
-        for (String str : strs) {
-            try {
-                times.add(Long.parseLong(str));
-            } catch (Exception ex) {
-                LOG.warn(str + " is not a number");
-            }
-        }
-        return times;
-    }
-
-    public static int getPosition(List<Long> times, Long duration) {
-        duration = duration / 1000;
-        for (int i = 1; i < times.size(); i++) {
-            if (duration < times.get(i)) {
-                return i - 1;
-            }
-        }
-        return times.size() - 1;
-    }
-
-    public void getTopTasks(List<MRJobTaskCountResponse.UnitTaskCount> list, 
long top) {
-        for (MRJobTaskCountResponse.UnitTaskCount taskCounter : list) {
-            Iterator<TaskExecutionAPIEntity> iterator = 
taskCounter.entities.iterator();
-            for (int i = 0; i < top && iterator.hasNext(); i++) {
-                taskCounter.topEntities.add(iterator.next());
-            }
-            taskCounter.entities.clear();
-        }
-    }
-
-    public void countTask(MRJobTaskCountResponse.UnitTaskCount counter, String 
taskType) {
-        counter.taskCount++;
-        if (taskType.equalsIgnoreCase(Constants.TaskType.MAP.toString())) {
-            counter.mapTaskCount++;
-        } else if 
(taskType.equalsIgnoreCase(Constants.TaskType.REDUCE.toString())) {
-            counter.reduceTaskCount++;
-        }
-    }
-
-    public void initTaskCountList(List<MRJobTaskCountResponse.UnitTaskCount> 
runningTaskCount,
-                                  List<MRJobTaskCountResponse.UnitTaskCount> 
finishedTaskCount,
-                                  List<Long> times,
-                                  Comparator comparator) {
-        for (int i = 0; i < times.size(); i++) {
-            runningTaskCount.add(new 
MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator));
-            finishedTaskCount.add(new 
MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator));
-        }
-    }
-
-    static class RunningTaskComparator implements 
Comparator<TaskExecutionAPIEntity> {
-        @Override
-        public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity 
o2) {
-            Long time1 = o1.getDuration();
-            Long time2 = o2.getDuration();
-            return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1);
-        }
-    }
-
-    static class HistoryTaskComparator implements 
Comparator<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> {
-        @Override
-        public int 
compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1,
-                           
org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) {
-            Long time1 = o1.getDuration();
-            Long time2 = o2.getDuration();
-            return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRJobCountImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRJobCountImpl.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRJobCountImpl.java
new file mode 100644
index 0000000..d8fc004
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRJobCountImpl.java
@@ -0,0 +1,146 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.count;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse.UnitJobCount;
+import org.apache.eagle.service.jpm.ResourceUtils;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class MRJobCountImpl {
+
+    public void initJobCountList(List<UnitJobCount> jobCounts, long startTime, 
long endTime, long intervalInSecs) {
+        for (long i = startTime / intervalInSecs; i * intervalInSecs <= 
endTime; i++) {
+            jobCounts.add(new UnitJobCount(i * intervalInSecs * 
DateTimeUtil.ONESECOND));
+        }
+    }
+
+    public String moveTimeForwardOneDay(String startTime) throws 
ParseException {
+        long timeInSecs = DateTimeUtil.humanDateToSeconds(startTime);
+        timeInSecs -= 24L * 60L * 60L;
+        return DateTimeUtil.secondsToHumanDate(timeInSecs);
+    }
+
+    public JobCountResponse getRunningJobCount(List<JobExecutionAPIEntity> 
historyJobs,
+                                               
List<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity> runningJobs,
+                                               long startTimeInSecs,
+                                               long endTimeInSecs,
+                                               long intervalInSecs) {
+        List<UnitJobCount> jobCounts = new ArrayList<>();
+        Set<String> jobTypes = new HashSet<>();
+        Set<String> jobIds = new HashSet<>();
+        initJobCountList(jobCounts, startTimeInSecs, endTimeInSecs, 
intervalInSecs);
+        long startTimeInMs = startTimeInSecs * DateTimeUtil.ONESECOND;
+        for (JobExecutionAPIEntity job: historyJobs) {
+            jobIds.add(job.getTags().get(MRJobTagName.JOB_ID.toString()));
+            if (job.getEndTime() >= startTimeInMs) {
+                String jobType = 
job.getTags().get(MRJobTagName.JOB_TYPE.toString());
+                jobTypes.add(jobType);
+                countJob(jobCounts, job.getStartTime() / 
DateTimeUtil.ONESECOND, job.getEndTime() / DateTimeUtil.ONESECOND, 
intervalInSecs, jobType);
+            }
+        }
+        for (org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity job : 
runningJobs) {
+            if (!ResourceUtils.isDuplicate(jobIds, 
job.getTags().get(MRJobTagName.JOB_ID.toString()))) {
+                String jobType = 
job.getTags().get(MRJobTagName.JOB_TYPE.toString());
+                jobTypes.add(jobType);
+                countJob(jobCounts, job.getStartTime() / 
DateTimeUtil.ONESECOND, endTimeInSecs, intervalInSecs, jobType);
+            }
+        }
+        JobCountResponse response = new JobCountResponse();
+        response.jobCounts = jobCounts;
+        response.jobTypes = jobTypes;
+        return response;
+    }
+
+    public JobCountResponse 
getHistoryJobCountGroupByDuration(List<JobExecutionAPIEntity> jobDurations, 
String timeList) {
+        JobCountResponse response = new JobCountResponse();
+        List<UnitJobCount> jobCounts = new ArrayList<>();
+        Set<String> jobTypes = new HashSet<>();
+        List<Long> times = ResourceUtils.parseDistributionList(timeList);
+        for (int i = 0; i < times.size(); i++) {
+            jobCounts.add(new UnitJobCount(times.get(i)));
+        }
+        for (JobExecutionAPIEntity job : jobDurations) {
+            int jobIndex = ResourceUtils.getDistributionPosition(times, 
job.getDurationTime() / DateTimeUtil.ONESECOND);
+            UnitJobCount counter = jobCounts.get(jobIndex);
+            String jobType = 
job.getTags().get(MRJobTagName.JOB_TYPE.toString());
+            jobTypes.add(jobType);
+            countJob(counter, jobType);
+        }
+        response.jobCounts = jobCounts;
+        response.jobTypes = jobTypes;
+        return response;
+    }
+
+    public void countJob(UnitJobCount counter, String jobType) {
+        if (null  ==  jobType) {
+            jobType = "null";
+        }
+        counter.jobCount++;
+        if (counter.jobCountByType.containsKey(jobType)) {
+            counter.jobCountByType.put(jobType, 
counter.jobCountByType.get(jobType) + 1);
+        } else {
+            counter.jobCountByType.put(jobType, 1L);
+        }
+    }
+
+    public void countJob(List<UnitJobCount> jobCounts, long jobStartTimeSecs, 
long jobEndTimeSecs, long intervalInSecs, String jobType) {
+        long startCountPoint = jobCounts.get(0).timeBucket / 
DateTimeUtil.ONESECOND;
+        if (jobEndTimeSecs < startCountPoint) {
+            return;
+        }
+        int startIndex = 0;
+        if (jobStartTimeSecs > startCountPoint) {
+            long relativeStartTime = jobStartTimeSecs - startCountPoint;
+            startIndex = (int) (relativeStartTime / intervalInSecs) + 
(relativeStartTime % intervalInSecs == 0 ? 0 : 1);
+        }
+        long relativeEndTime = jobEndTimeSecs - startCountPoint;
+        int endIndex = (int) (relativeEndTime / intervalInSecs);
+
+        for (int i = startIndex; i <= endIndex && i < jobCounts.size(); i++) {
+            countJob(jobCounts.get(i), jobType);
+        }
+    }
+
+    public List<String> getSearchTimeDuration(List<JobExecutionAPIEntity> 
jobEntities) {
+        List<String> pair = new ArrayList<>();
+        long minStartTime = System.currentTimeMillis();
+        long maxEndTime = 0;
+        for (JobExecutionAPIEntity jobEntity : jobEntities) {
+            if (minStartTime > jobEntity.getStartTime()) {
+                minStartTime = jobEntity.getStartTime();
+            }
+            if (maxEndTime < jobEntity.getEndTime()) {
+                maxEndTime = jobEntity.getEndTime();
+            }
+        }
+        
pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(minStartTime));
+        pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(maxEndTime));
+        return pair;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRTaskCountImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRTaskCountImpl.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRTaskCountImpl.java
new file mode 100644
index 0000000..f5017eb
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/count/MRTaskCountImpl.java
@@ -0,0 +1,124 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.count;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse;
+import org.apache.eagle.service.jpm.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class MRTaskCountImpl {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MRTaskCountImpl.class);
+
+    public void getTopTasks(List<MRJobTaskCountResponse.UnitTaskCount> list, 
long top) {
+        for (MRJobTaskCountResponse.UnitTaskCount taskCounter : list) {
+            Iterator<TaskExecutionAPIEntity> iterator = 
taskCounter.entities.iterator();
+            for (int i = 0; i < top && iterator.hasNext(); i++) {
+                taskCounter.topEntities.add(iterator.next());
+            }
+            taskCounter.entities.clear();
+        }
+    }
+
+    public void countTask(MRJobTaskCountResponse.UnitTaskCount counter, String 
taskType) {
+        counter.taskCount++;
+        if (taskType.equalsIgnoreCase(Constants.TaskType.MAP.toString())) {
+            counter.mapTaskCount++;
+        } else if 
(taskType.equalsIgnoreCase(Constants.TaskType.REDUCE.toString())) {
+            counter.reduceTaskCount++;
+        }
+    }
+
+    public void initTaskCountList(List<MRJobTaskCountResponse.UnitTaskCount> 
runningTaskCount,
+                                  List<MRJobTaskCountResponse.UnitTaskCount> 
finishedTaskCount,
+                                  List<Long> times,
+                                  Comparator comparator) {
+        for (int i = 0; i < times.size(); i++) {
+            runningTaskCount.add(new 
MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator));
+            finishedTaskCount.add(new 
MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator));
+        }
+    }
+
+    public static class RunningTaskComparator implements 
Comparator<TaskExecutionAPIEntity> {
+        @Override
+        public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity 
o2) {
+            Long time1 = o1.getDuration();
+            Long time2 = o2.getDuration();
+            return (time1 > time2 ? -1 : (time1.equals(time2)) ? 0 : 1);
+        }
+    }
+
+    public static class HistoryTaskComparator implements 
Comparator<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> {
+        @Override
+        public int 
compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1,
+                           
org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) {
+            Long time1 = o1.getDuration();
+            Long time2 = o2.getDuration();
+            return (time1 > time2 ? -1 : (time1.equals(time2)) ? 0 : 1);
+        }
+    }
+
+    public MRJobTaskCountResponse.HistoryTaskCountResponse 
countHistoryTask(List<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity>
 tasks, long startTimeInMin, long endTimeInMin) {
+        List<MRJobTaskCountResponse.UnitTaskCount> taskCounts = new 
ArrayList<>();
+        for (long i = startTimeInMin; i <= endTimeInMin; i++) {
+            taskCounts.add(new MRJobTaskCountResponse.UnitTaskCount(i * 
DateTimeUtil.ONEMINUTE, null));
+        }
+        for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity task 
: tasks) {
+            String taskType = 
task.getTags().get(MRJobTagName.TASK_TYPE.toString());
+            long taskStarTimeMin = task.getStartTime() / 
DateTimeUtil.ONEMINUTE;
+            long taskEndTimeMin = task.getEndTime() / DateTimeUtil.ONEMINUTE;
+            int relativeStartTime = (int) (taskStarTimeMin - startTimeInMin);
+            int relativeEndTime = (int) (taskEndTimeMin - startTimeInMin);
+            for (int i = relativeStartTime; i <= relativeEndTime; i++) {
+                countTask(taskCounts.get(i), taskType);
+            }
+        }
+        MRJobTaskCountResponse.HistoryTaskCountResponse response = new 
MRJobTaskCountResponse.HistoryTaskCountResponse();
+        response.taskCount = taskCounts;
+        return response;
+    }
+
+    public MRTaskExecutionResponse.TaskDistributionResponse 
getHistoryTaskDistribution(List<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity>
 tasks, String counterName, String distRange) {
+        MRTaskExecutionResponse.TaskDistributionResponse response = new 
MRTaskExecutionResponse.TaskDistributionResponse();
+        response.counterName = counterName;
+        List<Long> distRangeList = 
ResourceUtils.parseDistributionList(distRange);
+        for (int i = 0; i < distRangeList.size(); i++) {
+            response.taskBuckets.add(new 
MRTaskExecutionResponse.CountUnit(distRangeList.get(i)));
+        }
+        JobCounters.CounterName jobCounterName = 
JobCounters.CounterName.valueOf(counterName.toUpperCase());
+        for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity task 
: tasks) {
+            Long counterValue = 
task.getJobCounters().getCounterValue(jobCounterName);
+            int pos = ResourceUtils.getDistributionPosition(distRangeList, 
counterValue);
+            response.taskBuckets.get(pos).countVal++;
+        }
+        return response;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractGCFunc.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractGCFunc.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractGCFunc.java
new file mode 100644
index 0000000..ad38fcb
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractGCFunc.java
@@ -0,0 +1,104 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.suggestion;
+
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse.TaskGroup;
+import 
org.apache.eagle.service.jpm.MRTaskExecutionResponse.JobSuggestionResponse;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse.TaskGroupResponse;
+import org.apache.eagle.service.jpm.ResourceUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractGCFunc implements SuggestionFunc {
+
+    private static final String GC_RATIO_NAME_FORMAT = "gcRatio (%s / %s) 
deviation";
+    private static final String GC_SUGGESTION_FORMAT = "gcRatio deviation 
exceeds threshold %.2f, where the deviation is %.2f / %.2f";
+
+    private static final double GC_RATIO_DEVIATION_THRESHOLD = 2;
+
+    private Constants.SuggestionType suggestionType;
+    private double threshold;
+
+    public AbstractGCFunc(Constants.SuggestionType suggestionType) {
+        this.suggestionType = suggestionType;
+        this.threshold = GC_RATIO_DEVIATION_THRESHOLD;
+    }
+
+    public AbstractGCFunc(Constants.SuggestionType suggestionType, double 
threshold) {
+        this.suggestionType = suggestionType;
+        this.threshold = threshold > 0 ? threshold : 
GC_RATIO_DEVIATION_THRESHOLD;
+    }
+
+    protected abstract TaskGroup getTasks(TaskGroupResponse tasks);
+
+
+    private double getGcRatio(List<TaskExecutionAPIEntity> tasks) {
+        if (tasks.isEmpty()) {
+            return 0;
+        }
+        double[] gcMs = ResourceUtils.getCounterValues(tasks, 
JobCounters.CounterName.GC_MILLISECONDS);
+        double[] cpuMs = ResourceUtils.getCounterValues(tasks, 
JobCounters.CounterName.CPU_MILLISECONDS);
+
+        DescriptiveStatistics statistics = new DescriptiveStatistics();
+        double averageCpuMs = statistics.getMeanImpl().evaluate(cpuMs);
+        double averageGcMs = statistics.getMeanImpl().evaluate(gcMs);
+        if (averageCpuMs == 0) {
+            averageCpuMs = 1;
+        }
+        return averageGcMs / averageCpuMs;
+    }
+
+    @Override
+    public JobSuggestionResponse apply(TaskGroupResponse data) {
+        JobSuggestionResponse response = new JobSuggestionResponse();
+        response.suggestionType = suggestionType.name();
+
+        TaskGroup taskGroup = getTasks(data);
+        if (taskGroup.longTasks.isEmpty()) {
+            return response;
+        }
+
+        double smallerGcRatio = getGcRatio(taskGroup.shortTasks);
+        double largerGcRatio = getGcRatio(taskGroup.longTasks);
+        response.suggestionResults = getGCsuggest(smallerGcRatio, 
largerGcRatio);
+        return response;
+    }
+
+    private List<MRTaskExecutionResponse.SuggestionResult> getGCsuggest(double 
smallerRatio, double largerRatio) {
+        if (smallerRatio <= 0) {
+            smallerRatio = 1;
+        }
+        double deviation = largerRatio / smallerRatio;
+        String suggestName = String.format(GC_RATIO_NAME_FORMAT, 
JobCounters.CounterName.GC_MILLISECONDS.getName(), 
JobCounters.CounterName.CPU_MILLISECONDS.getName());
+        String suggestion = null;
+        if (deviation > threshold) {
+            suggestion = String.format(GC_SUGGESTION_FORMAT, threshold, 
largerRatio, smallerRatio);
+        }
+        List<MRTaskExecutionResponse.SuggestionResult> suggestionResults = new 
ArrayList<>();
+        suggestionResults.add(new 
MRTaskExecutionResponse.SuggestionResult(suggestName, deviation, suggestion));
+        return suggestionResults;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractInputFunc.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractInputFunc.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractInputFunc.java
new file mode 100644
index 0000000..5a89055
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/AbstractInputFunc.java
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.suggestion;
+
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse;
+import 
org.apache.eagle.service.jpm.MRTaskExecutionResponse.JobSuggestionResponse;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse.TaskGroupResponse;
+import org.apache.eagle.service.jpm.ResourceUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractInputFunc implements SuggestionFunc {
+
+    private JobCounters.CounterName counterName;
+    private Constants.SuggestionType suggestType;
+    private double threshold;
+    private static final long DATA_SKEW_THRESHOLD = 2;
+
+    private static final String DEVIATION_SUGGEST_FORMAT = "average %s 
deviation";
+    private static final String DATA_SKEW_SUGGESTION_FORMAT = "%s deviation 
exceeds threshold %.2f, where the deviation is %.2f / %.2f";
+
+    public AbstractInputFunc(JobCounters.CounterName counterName, 
Constants.SuggestionType type) {
+        this.counterName = counterName;
+        this.suggestType = type;
+        this.threshold = DATA_SKEW_THRESHOLD;
+    }
+
+    public AbstractInputFunc(JobCounters.CounterName counterName, 
Constants.SuggestionType type, double threshold) {
+        this.counterName = counterName;
+        this.suggestType = type;
+        this.threshold = threshold > 0 ? threshold : DATA_SKEW_THRESHOLD;
+    }
+
+    protected abstract MRTaskExecutionResponse.TaskGroup 
getTasks(TaskGroupResponse tasks);
+
+    @Override
+    public JobSuggestionResponse apply(TaskGroupResponse data) {
+        MRTaskExecutionResponse.TaskGroup taskGroup = getTasks(data);
+        double[] smallerGroup = 
ResourceUtils.getCounterValues(taskGroup.shortTasks, counterName);
+        double[] largerGroup = 
ResourceUtils.getCounterValues(taskGroup.longTasks, counterName);
+        DescriptiveStatistics statistics = new DescriptiveStatistics();
+        double avgSmaller = statistics.getMeanImpl().evaluate(smallerGroup);
+        double avgLarger = statistics.getMeanImpl().evaluate(largerGroup);
+
+        List<MRTaskExecutionResponse.SuggestionResult> suggestionResults = 
getDeviationSuggest(avgSmaller, avgLarger);
+        MRTaskExecutionResponse.JobSuggestionResponse response = new 
MRTaskExecutionResponse.JobSuggestionResponse();
+        response.suggestionResults = suggestionResults;
+        response.suggestionType = suggestType.toString();
+        return response;
+    }
+
+    private List<MRTaskExecutionResponse.SuggestionResult> 
getDeviationSuggest(double avgSmaller, double avgLarger) {
+        if (avgSmaller <= 0) {
+            avgSmaller = 1;
+        }
+        double deviation = avgLarger / avgSmaller;
+        String suggestName = String.format(DEVIATION_SUGGEST_FORMAT, 
counterName.getName());
+        String suggestion = null;
+        if (deviation > threshold) {
+            suggestion = String.format(DATA_SKEW_SUGGESTION_FORMAT, 
counterName.getName(), threshold, avgLarger, avgSmaller);
+        }
+        List<MRTaskExecutionResponse.SuggestionResult> suggestionResults = new 
ArrayList<>();
+        suggestionResults.add(new 
MRTaskExecutionResponse.SuggestionResult(suggestName, deviation, suggestion));
+        return suggestionResults;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapGCFunc.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapGCFunc.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapGCFunc.java
new file mode 100644
index 0000000..9fd54b0
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapGCFunc.java
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.suggestion;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse;
+
+public class MapGCFunc extends AbstractGCFunc {
+
+    public MapGCFunc() {
+        super(Constants.SuggestionType.MapGC);
+    }
+
+    public MapGCFunc(double threshold) {
+        super(Constants.SuggestionType.MapGC, threshold);
+    }
+
+    @Override
+    protected MRTaskExecutionResponse.TaskGroup 
getTasks(MRTaskExecutionResponse.TaskGroupResponse tasks) {
+        return tasks.tasksGroupByType.get(Constants.TaskType.MAP.toString());
+    }
+}


Reply via email to