Repository: asterixdb Updated Branches: refs/heads/master 40ed45f89 -> 0e2a7af3a
[ASTERIXDB-2197][FAIL] Abort Job on Failures in NotifyTaskCompleteWork - user model changes: no - storage format changes: no - interface changes: no Details: - Abort the job if any unexpected failure happens on NotifyTaskCompleteWork to ensure that the job will not be waiting forever. Change-Id: I60c911c7aae872ee6b94e68efa53638207c0180d Reviewed-on: https://asterix-gerrit.ics.uci.edu/2218 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Till Westmann <ti...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0e2a7af3 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0e2a7af3 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0e2a7af3 Branch: refs/heads/master Commit: 0e2a7af3acc058e7247ceccfd116feb612cdf120 Parents: 40ed45f Author: Murtadha Hubail <mhub...@apache.org> Authored: Tue Dec 12 06:45:24 2017 +0300 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Wed Dec 13 09:16:30 2017 -0800 ---------------------------------------------------------------------- .../control/cc/executor/JobExecutor.java | 48 +++++++++++--------- .../control/cc/work/TaskCompleteWork.java | 27 +++++------ 2 files changed, 38 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e2a7af3/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index 01201a6..ab7a3db 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -642,27 +642,32 @@ public class JobExecutor { return doomed; } - public void notifyTaskComplete(TaskAttempt ta) throws HyracksException { - TaskAttemptId taId = ta.getTaskAttemptId(); - TaskCluster tc = ta.getTask().getTaskCluster(); - TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc); - if (lastAttempt == null || taId.getAttempt() != lastAttempt.getAttempt()) { - LOGGER.warning( - "Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt); - return; - } - TaskAttempt.TaskStatus taStatus = ta.getStatus(); - if (taStatus != TaskAttempt.TaskStatus.RUNNING) { - LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus); - return; - } - ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null); - ta.setEndTime(System.currentTimeMillis()); - if (lastAttempt.decrementPendingTasksCounter() == 0) { - lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED); - lastAttempt.setEndTime(System.currentTimeMillis()); - inProgressTaskClusters.remove(tc); - startRunnableActivityClusters(); + public void notifyTaskComplete(TaskAttempt ta) { + try { + TaskAttemptId taId = ta.getTaskAttemptId(); + TaskCluster tc = ta.getTask().getTaskCluster(); + TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc); + if (lastAttempt == null || taId.getAttempt() != lastAttempt.getAttempt()) { + LOGGER.warning(() -> "Ignoring task complete notification: " + taId + " -- Current last attempt = " + + lastAttempt); + return; + } + TaskAttempt.TaskStatus taStatus = ta.getStatus(); + if (taStatus != TaskAttempt.TaskStatus.RUNNING) { + LOGGER.warning(() -> "Spurious task complete notification: " + taId + " Current state = " + taStatus); + return; + } + ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null); + ta.setEndTime(System.currentTimeMillis()); + if (lastAttempt.decrementPendingTasksCounter() == 0) { + lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED); + lastAttempt.setEndTime(System.currentTimeMillis()); + inProgressTaskClusters.remove(tc); + startRunnableActivityClusters(); + } + } catch (Exception e) { + LOGGER.log(Level.SEVERE, e, () -> "Unexpected failure. Aborting job " + jobRun.getJobId()); + abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE); } } @@ -724,6 +729,7 @@ public class JobExecutor { ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId())); startRunnableActivityClusters(); } catch (Exception e) { + LOGGER.log(Level.SEVERE, e, () -> "Unexpected failure. Aborting job " + jobRun.getJobId()); abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e2a7af3/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java index f4f2f52..e2f8b0d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java @@ -21,7 +21,6 @@ package org.apache.hyracks.control.cc.work; import java.util.Map; import org.apache.hyracks.api.dataflow.TaskAttemptId; -import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.job.IJobManager; @@ -42,23 +41,19 @@ public class TaskCompleteWork extends AbstractTaskLifecycleWork { @Override protected void performEvent(TaskAttempt ta) { - try { - IJobManager jobManager = ccs.getJobManager(); - JobRun run = jobManager.get(jobId); - if (statistics != null) { - JobProfile jobProfile = run.getJobProfile(); - Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles(); - JobletProfile jobletProfile = jobletProfiles.get(nodeId); - if (jobletProfile == null) { - jobletProfile = new JobletProfile(nodeId); - jobletProfiles.put(nodeId, jobletProfile); - } - jobletProfile.getTaskProfiles().put(taId, statistics); + IJobManager jobManager = ccs.getJobManager(); + JobRun run = jobManager.get(jobId); + if (statistics != null) { + JobProfile jobProfile = run.getJobProfile(); + Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles(); + JobletProfile jobletProfile = jobletProfiles.get(nodeId); + if (jobletProfile == null) { + jobletProfile = new JobletProfile(nodeId); + jobletProfiles.put(nodeId, jobletProfile); } - run.getExecutor().notifyTaskComplete(ta); - } catch (HyracksException e) { - e.printStackTrace(); + jobletProfile.getTaskProfiles().put(taId, statistics); } + run.getExecutor().notifyTaskComplete(ta); } @Override