Repository: asterixdb
Updated Branches:
  refs/heads/master 40ed45f89 -> 0e2a7af3a


[ASTERIXDB-2197][FAIL] Abort Job on Failures in NotifyTaskCompleteWork

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Abort the job if any unexpected failure happens on
  NotifyTaskCompleteWork to ensure that the job will
  not be waiting forever.

Change-Id: I60c911c7aae872ee6b94e68efa53638207c0180d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2218
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0e2a7af3
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0e2a7af3
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0e2a7af3

Branch: refs/heads/master
Commit: 0e2a7af3acc058e7247ceccfd116feb612cdf120
Parents: 40ed45f
Author: Murtadha Hubail <mhub...@apache.org>
Authored: Tue Dec 12 06:45:24 2017 +0300
Committer: Murtadha Hubail <mhub...@apache.org>
Committed: Wed Dec 13 09:16:30 2017 -0800

----------------------------------------------------------------------
 .../control/cc/executor/JobExecutor.java        | 48 +++++++++++---------
 .../control/cc/work/TaskCompleteWork.java       | 27 +++++------
 2 files changed, 38 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e2a7af3/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 01201a6..ab7a3db 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -642,27 +642,32 @@ public class JobExecutor {
         return doomed;
     }
 
-    public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
-        TaskAttemptId taId = ta.getTaskAttemptId();
-        TaskCluster tc = ta.getTask().getTaskCluster();
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
-        if (lastAttempt == null || taId.getAttempt() != 
lastAttempt.getAttempt()) {
-            LOGGER.warning(
-                    "Ignoring task complete notification: " + taId + " -- 
Current last attempt = " + lastAttempt);
-            return;
-        }
-        TaskAttempt.TaskStatus taStatus = ta.getStatus();
-        if (taStatus != TaskAttempt.TaskStatus.RUNNING) {
-            LOGGER.warning("Spurious task complete notification: " + taId + " 
Current state = " + taStatus);
-            return;
-        }
-        ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
-        ta.setEndTime(System.currentTimeMillis());
-        if (lastAttempt.decrementPendingTasksCounter() == 0) {
-            
lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
-            lastAttempt.setEndTime(System.currentTimeMillis());
-            inProgressTaskClusters.remove(tc);
-            startRunnableActivityClusters();
+    public void notifyTaskComplete(TaskAttempt ta) {
+        try {
+            TaskAttemptId taId = ta.getTaskAttemptId();
+            TaskCluster tc = ta.getTask().getTaskCluster();
+            TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+            if (lastAttempt == null || taId.getAttempt() != 
lastAttempt.getAttempt()) {
+                LOGGER.warning(() -> "Ignoring task complete notification: " + 
taId + " -- Current last attempt = "
+                        + lastAttempt);
+                return;
+            }
+            TaskAttempt.TaskStatus taStatus = ta.getStatus();
+            if (taStatus != TaskAttempt.TaskStatus.RUNNING) {
+                LOGGER.warning(() -> "Spurious task complete notification: " + 
taId + " Current state = " + taStatus);
+                return;
+            }
+            ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
+            ta.setEndTime(System.currentTimeMillis());
+            if (lastAttempt.decrementPendingTasksCounter() == 0) {
+                
lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+                lastAttempt.setEndTime(System.currentTimeMillis());
+                inProgressTaskClusters.remove(tc);
+                startRunnableActivityClusters();
+            }
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, e, () -> "Unexpected failure. Aborting 
job " + jobRun.getJobId());
+            abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
         }
     }
 
@@ -724,6 +729,7 @@ public class JobExecutor {
                     ta -> HyracksException.create(ErrorCode.NODE_FAILED, 
ta.getNodeId()));
             startRunnableActivityClusters();
         } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, e, () -> "Unexpected failure. Aborting 
job " + jobRun.getJobId());
             abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e2a7af3/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
index f4f2f52..e2f8b0d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.control.cc.work;
 import java.util.Map;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.IJobManager;
@@ -42,23 +41,19 @@ public class TaskCompleteWork extends 
AbstractTaskLifecycleWork {
 
     @Override
     protected void performEvent(TaskAttempt ta) {
-        try {
-            IJobManager jobManager = ccs.getJobManager();
-            JobRun run = jobManager.get(jobId);
-            if (statistics != null) {
-                JobProfile jobProfile = run.getJobProfile();
-                Map<String, JobletProfile> jobletProfiles = 
jobProfile.getJobletProfiles();
-                JobletProfile jobletProfile = jobletProfiles.get(nodeId);
-                if (jobletProfile == null) {
-                    jobletProfile = new JobletProfile(nodeId);
-                    jobletProfiles.put(nodeId, jobletProfile);
-                }
-                jobletProfile.getTaskProfiles().put(taId, statistics);
+        IJobManager jobManager = ccs.getJobManager();
+        JobRun run = jobManager.get(jobId);
+        if (statistics != null) {
+            JobProfile jobProfile = run.getJobProfile();
+            Map<String, JobletProfile> jobletProfiles = 
jobProfile.getJobletProfiles();
+            JobletProfile jobletProfile = jobletProfiles.get(nodeId);
+            if (jobletProfile == null) {
+                jobletProfile = new JobletProfile(nodeId);
+                jobletProfiles.put(nodeId, jobletProfile);
             }
-            run.getExecutor().notifyTaskComplete(ta);
-        } catch (HyracksException e) {
-            e.printStackTrace();
+            jobletProfile.getTaskProfiles().put(taId, statistics);
         }
+        run.getExecutor().notifyTaskComplete(ta);
     }
 
     @Override

Reply via email to