abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1197

Change subject: Fix Failure Handling on Tasks Start
......................................................................

Fix Failure Handling on Tasks Start

Change-Id: I2ec2c798b704ca426d5937f22e6d2bd394a9095a
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartJobWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
A 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
A 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
13 files changed, 523 insertions(+), 199 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/97/1197/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
index a92e700..d7936e0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
@@ -81,8 +81,8 @@
     private static class ClassLoaderObjectInputStream extends 
ObjectInputStream {
         private ClassLoader classLoader;
 
-        protected ClassLoaderObjectInputStream(InputStream in, ClassLoader 
classLoader)
-                throws IOException, SecurityException {
+        protected ClassLoaderObjectInputStream(InputStream in, ClassLoader 
classLoader) throws IOException,
+                SecurityException {
             super(in);
             this.classLoader = classLoader;
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
index b377b1a..1541190 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
@@ -31,9 +31,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.constraints.Constraint;
 import 
org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
@@ -66,6 +63,8 @@
 import org.apache.hyracks.control.cc.work.JobCleanupWork;
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public class JobScheduler {
     private static final Logger LOGGER = 
Logger.getLogger(JobScheduler.class.getName());
@@ -160,7 +159,7 @@
     }
 
     private void startRunnableActivityClusters() throws HyracksException {
-        Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
+        Set<TaskCluster> taskClusterRoots = new HashSet<>();
         findRunnableTaskClusterRoots(taskClusterRoots, 
jobRun.getActivityClusterGraph().getActivityClusterMap()
                 .values());
         if (LOGGER.isLoggable(Level.FINE)) {
@@ -312,11 +311,11 @@
         for (int i = 0; i < tasks.length; ++i) {
             Task ts = tasks[i];
             TaskId tid = ts.getTaskId();
-            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new 
TaskAttemptId(new TaskId(tid.getActivityId(),
-                    tid.getPartition()), attempts), ts);
+            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new 
TaskAttemptId(new TaskId(tid.getActivityId(), tid
+                    .getPartition()), attempts), ts);
             taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
-            locationMap.put(tid,
-                    new 
PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), 
tid.getPartition()));
+            locationMap.put(tid, new 
PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid
+                    .getPartition()));
             taskAttempts.put(tid, taskAttempt);
         }
         tcAttempt.setTaskAttempts(taskAttempts);
@@ -337,8 +336,8 @@
             OperatorDescriptorId opId = 
tid.getActivityId().getOperatorDescriptorId();
             jobRun.registerOperatorLocation(opId, tid.getPartition(), nodeId);
             ActivityPartitionDetails apd = 
ts.getActivityPlan().getActivityPartitionDetails();
-            TaskAttemptDescriptor tad = new 
TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
-                    apd.getPartitionCount(), apd.getInputPartitionCounts(), 
apd.getOutputPartitionCounts());
+            TaskAttemptDescriptor tad = new 
TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(), apd
+                    .getPartitionCount(), apd.getInputPartitionCounts(), 
apd.getOutputPartitionCounts());
             tads.add(tad);
         }
         tcAttempt.initializePendingTaskCounter();
@@ -409,8 +408,8 @@
             Object location = solver.getValue(pLocationExpr);
             if (location == null) {
                 // pick any
-                nodeId = liveNodes.toArray(new 
String[liveNodes.size()])[Math.abs(new Random().nextInt())
-                        % liveNodes.size()];
+                nodeId = liveNodes.toArray(new 
String[liveNodes.size()])[Math.abs(new Random().nextInt()) % liveNodes
+                        .size()];
             } else if (location instanceof String) {
                 nodeId = (String) location;
             } else if (location instanceof String[]) {
@@ -462,8 +461,8 @@
         final DeploymentId deploymentId = jobRun.getDeploymentId();
         final JobId jobId = jobRun.getJobId();
         final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
-        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = 
new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
-                jobRun.getConnectorPolicyMap());
+        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
+                new HashMap<ConnectorDescriptorId, 
IConnectorPolicy>(jobRun.getConnectorPolicyMap());
         try {
             byte[] acgBytes = JavaSerializationUtils.serialize(acg);
             for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : 
taskAttemptMap.entrySet()) {
@@ -476,9 +475,11 @@
                     if (LOGGER.isLoggable(Level.FINE)) {
                         LOGGER.fine("Starting: " + taskDescriptors + " at " + 
entry.getKey());
                     }
-                    byte[] jagBytes = changed ? acgBytes : null;
-                    node.getNodeController().startTasks(deploymentId, jobId, 
jagBytes, taskDescriptors,
-                            connectorPolicies, jobRun.getFlags());
+                    if (changed) {
+                        // create job once
+                        node.getNodeController().startJob(deploymentId, jobId, 
acgBytes);
+                    }
+                    node.getNodeController().startTasks(jobId, 
taskDescriptors, connectorPolicies, jobRun.getFlags());
                 }
             }
         } catch (Exception e) {
@@ -588,8 +589,8 @@
             ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
             IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
             PartitionState maxState = pmm.getMaximumAvailableState(pid);
-            if (maxState == null
-                    || (cPolicy.consumerWaitsForProducerToFinish() && maxState 
!= PartitionState.COMMITTED)) {
+            if (maxState == null || (cPolicy.consumerWaitsForProducerToFinish()
+                    && maxState != PartitionState.COMMITTED)) {
                 if 
(findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid), 
doomedTaskClusters)) {
                     doomed = true;
                 }
@@ -620,7 +621,8 @@
                 LOGGER.warning("Spurious task complete notification: " + taId 
+ " Current state = " + taStatus);
             }
         } else {
-            LOGGER.warning("Ignoring task complete notification: " + taId + " 
-- Current last attempt = " + lastAttempt);
+            LOGGER.warning("Ignoring task complete notification: " + taId + " 
-- Current last attempt = "
+                    + lastAttempt);
         }
     }
 
@@ -680,17 +682,17 @@
                     if (taskClusters != null) {
                         for (TaskCluster tc : taskClusters) {
                             TaskClusterAttempt lastTaskClusterAttempt = 
findLastTaskClusterAttempt(tc);
-                            if (lastTaskClusterAttempt != null
-                                    && (lastTaskClusterAttempt.getStatus() == 
TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
+                            if (lastTaskClusterAttempt != null && 
(lastTaskClusterAttempt
+                                    .getStatus() == 
TaskClusterAttempt.TaskClusterStatus.COMPLETED
+                                    || lastTaskClusterAttempt
                                             .getStatus() == 
TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
                                 boolean abort = false;
                                 for (TaskAttempt ta : 
lastTaskClusterAttempt.getTaskAttempts().values()) {
-                                    assert (ta.getStatus() == 
TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == 
TaskAttempt.TaskStatus.RUNNING);
+                                    assert (ta.getStatus() == 
TaskAttempt.TaskStatus.COMPLETED || ta
+                                            .getStatus() == 
TaskAttempt.TaskStatus.RUNNING);
                                     if (deadNodes.contains(ta.getNodeId())) {
-                                        ta.setStatus(
-                                                TaskAttempt.TaskStatus.FAILED,
-                                                Collections.singletonList(new 
Exception("Node " + ta.getNodeId()
-                                                        + " failed")));
+                                        
ta.setStatus(TaskAttempt.TaskStatus.FAILED, Collections.singletonList(
+                                                new Exception("Node " + 
ta.getNodeId() + " failed")));
                                         
ta.setEndTime(System.currentTimeMillis());
                                         abort = true;
                                     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index dff5827..f8b220b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -35,9 +35,8 @@
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 
 public interface INodeController {
-    public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] 
planBytes,
-            List<TaskAttemptDescriptor> taskDescriptors, 
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            EnumSet<JobFlag> flags) throws Exception;
+    public void startTasks(JobId jobId, List<TaskAttemptDescriptor> 
taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, 
EnumSet<JobFlag> flags) throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws 
Exception;
 
@@ -56,4 +55,6 @@
     public void sendApplicationMessageToNC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception;
 
     public void takeThreadDump(String requestId) throws Exception;
+
+    public void startJob(DeploymentId deploymentId, JobId jobId, byte[] 
planBytes) throws Exception;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index aa9a4fe..96cc068 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -66,7 +66,6 @@
 
 public class CCNCFunctions {
     private static final Logger LOGGER = 
Logger.getLogger(CCNCFunctions.class.getName());
-
     private static final int FID_CODE_SIZE = 1;
 
     public enum FunctionId {
@@ -84,6 +83,7 @@
         REPORT_RESULT_PARTITION_FAILURE,
 
         NODE_REGISTRATION_RESULT,
+        START_JOB,
         START_TASKS,
         ABORT_TASKS,
         CLEANUP_JOBLET,
@@ -670,20 +670,14 @@
 
     public static class StartTasksFunction extends Function {
         private static final long serialVersionUID = 1L;
-
-        private final DeploymentId deploymentId;
         private final JobId jobId;
-        private final byte[] planBytes;
         private final List<TaskAttemptDescriptor> taskDescriptors;
         private final Map<ConnectorDescriptorId, IConnectorPolicy> 
connectorPolicies;
         private final EnumSet<JobFlag> flags;
 
-        public StartTasksFunction(DeploymentId deploymentId, JobId jobId, 
byte[] planBytes,
-                List<TaskAttemptDescriptor> taskDescriptors,
+        public StartTasksFunction(JobId jobId, List<TaskAttemptDescriptor> 
taskDescriptors,
                 Map<ConnectorDescriptorId, IConnectorPolicy> 
connectorPolicies, EnumSet<JobFlag> flags) {
-            this.deploymentId = deploymentId;
             this.jobId = jobId;
-            this.planBytes = planBytes;
             this.taskDescriptors = taskDescriptors;
             this.connectorPolicies = connectorPolicies;
             this.flags = flags;
@@ -694,16 +688,8 @@
             return FunctionId.START_TASKS;
         }
 
-        public DeploymentId getDeploymentId() {
-            return deploymentId;
-        }
-
         public JobId getJobId() {
             return jobId;
-        }
-
-        public byte[] getPlanBytes() {
-            return planBytes;
         }
 
         public List<TaskAttemptDescriptor> getTaskDescriptors() {
@@ -716,6 +702,98 @@
 
         public EnumSet<JobFlag> getFlags() {
             return flags;
+        }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws 
Exception {
+            ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            //read jobId and taskId
+            JobId jobId = JobId.create(dis);
+
+            // read task attempt descriptors
+            int tadSize = dis.readInt();
+            List<TaskAttemptDescriptor> taskDescriptors = new 
ArrayList<TaskAttemptDescriptor>();
+            for (int i = 0; i < tadSize; i++) {
+                TaskAttemptDescriptor tad = TaskAttemptDescriptor.create(dis);
+                taskDescriptors.add(tad);
+            }
+
+            //read connector policies
+            int cpSize = dis.readInt();
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
+                    new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+            for (int i = 0; i < cpSize; i++) {
+                ConnectorDescriptorId cid = ConnectorDescriptorId.create(dis);
+                IConnectorPolicy policy = 
ConnectorPolicyFactory.INSTANCE.getConnectorPolicy(dis);
+                connectorPolicies.put(cid, policy);
+            }
+
+            // read flags
+            int flagSize = dis.readInt();
+            EnumSet<JobFlag> flags = EnumSet.noneOf(JobFlag.class);
+            for (int i = 0; i < flagSize; i++) {
+                flags.add(JobFlag.values()[(dis.readInt())]);
+            }
+            return new StartTasksFunction(jobId, taskDescriptors, 
connectorPolicies, flags);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws 
Exception {
+            StartTasksFunction fn = (StartTasksFunction) object;
+            DataOutputStream dos = new DataOutputStream(out);
+
+            //write jobId
+            fn.jobId.writeFields(dos);
+
+            //write task descriptors
+            dos.writeInt(fn.taskDescriptors.size());
+            for (int i = 0; i < fn.taskDescriptors.size(); i++) {
+                fn.taskDescriptors.get(i).writeFields(dos);
+            }
+
+            //write connector policies
+            dos.writeInt(fn.connectorPolicies.size());
+            for (Entry<ConnectorDescriptorId, IConnectorPolicy> entry : 
fn.connectorPolicies.entrySet()) {
+                entry.getKey().writeFields(dos);
+                
ConnectorPolicyFactory.INSTANCE.writeConnectorPolicy(entry.getValue(), dos);
+            }
+
+            //write flags
+            dos.writeInt(fn.flags.size());
+            for (JobFlag flag : fn.flags) {
+                dos.writeInt(flag.ordinal());
+            }
+        }
+    }
+
+    public static class StartJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final DeploymentId deploymentId;
+        private final JobId jobId;
+        private final byte[] planBytes;
+
+        public StartJobFunction(DeploymentId deploymentId, JobId jobId, byte[] 
planBytes) {
+            this.deploymentId = deploymentId;
+            this.jobId = jobId;
+            this.planBytes = planBytes;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_JOB;
+        }
+
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public byte[] getPlanBytes() {
+            return planBytes;
         }
 
         public static Object deserialize(ByteBuffer buffer, int length) throws 
Exception {
@@ -737,36 +815,11 @@
                 planBytes = new byte[planBytesSize];
                 dis.read(planBytes, 0, planBytesSize);
             }
-
-            // read task attempt descriptors
-            int tadSize = dis.readInt();
-            List<TaskAttemptDescriptor> taskDescriptors = new 
ArrayList<TaskAttemptDescriptor>();
-            for (int i = 0; i < tadSize; i++) {
-                TaskAttemptDescriptor tad = TaskAttemptDescriptor.create(dis);
-                taskDescriptors.add(tad);
-            }
-
-            //read connector policies
-            int cpSize = dis.readInt();
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = 
new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
-            for (int i = 0; i < cpSize; i++) {
-                ConnectorDescriptorId cid = ConnectorDescriptorId.create(dis);
-                IConnectorPolicy policy = 
ConnectorPolicyFactory.INSTANCE.getConnectorPolicy(dis);
-                connectorPolicies.put(cid, policy);
-            }
-
-            // read flags
-            int flagSize = dis.readInt();
-            EnumSet<JobFlag> flags = EnumSet.noneOf(JobFlag.class);
-            for (int i = 0; i < flagSize; i++) {
-                flags.add(JobFlag.values()[(dis.readInt())]);
-            }
-
-            return new StartTasksFunction(deploymentId, jobId, planBytes, 
taskDescriptors, connectorPolicies, flags);
+            return new StartJobFunction(deploymentId, jobId, planBytes);
         }
 
         public static void serialize(OutputStream out, Object object) throws 
Exception {
-            StartTasksFunction fn = (StartTasksFunction) object;
+            StartJobFunction fn = (StartJobFunction) object;
             DataOutputStream dos = new DataOutputStream(out);
 
             //write jobId and deploymentId
@@ -780,25 +833,6 @@
             dos.writeInt(fn.planBytes == null ? -1 : fn.planBytes.length);
             if (fn.planBytes != null) {
                 dos.write(fn.planBytes, 0, fn.planBytes.length);
-            }
-
-            //write task descriptors
-            dos.writeInt(fn.taskDescriptors.size());
-            for (int i = 0; i < fn.taskDescriptors.size(); i++) {
-                fn.taskDescriptors.get(i).writeFields(dos);
-            }
-
-            //write connector policies
-            dos.writeInt(fn.connectorPolicies.size());
-            for (Entry<ConnectorDescriptorId, IConnectorPolicy> entry : 
fn.connectorPolicies.entrySet()) {
-                entry.getKey().writeFields(dos);
-                
ConnectorPolicyFactory.INSTANCE.writeConnectorPolicy(entry.getValue(), dos);
-            }
-
-            //write flags
-            dos.writeInt(fn.flags.size());
-            for (JobFlag flag : fn.flags) {
-                dos.writeInt(flag.ordinal());
             }
         }
     }
@@ -1320,7 +1354,8 @@
         int cdid = dis.readInt();
         int senderIndex = dis.readInt();
         int receiverIndex = dis.readInt();
-        PartitionId pid = new PartitionId(new JobId(jobId), new 
ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+        PartitionId pid = new PartitionId(new JobId(jobId), new 
ConnectorDescriptorId(cdid), senderIndex,
+                receiverIndex);
         return pid;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index c3376e6..600b5c1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -44,11 +44,10 @@
     }
 
     @Override
-    public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] 
planBytes,
-            List<TaskAttemptDescriptor> taskDescriptors, 
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            EnumSet<JobFlag> flags) throws Exception {
-        CCNCFunctions.StartTasksFunction stf = new 
CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
-                taskDescriptors, connectorPolicies, flags);
+    public void startTasks(JobId jobId, List<TaskAttemptDescriptor> 
taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, 
EnumSet<JobFlag> flags) throws Exception {
+        CCNCFunctions.StartTasksFunction stf = new 
CCNCFunctions.StartTasksFunction(jobId, taskDescriptors,
+                connectorPolicies, flags);
         ipcHandle.send(-1, stf, null);
     }
 
@@ -107,4 +106,10 @@
         CCNCFunctions.ThreadDumpRequestFunction fn = new 
CCNCFunctions.ThreadDumpRequestFunction(requestId);
         ipcHandle.send(-1, fn, null);
     }
+
+    @Override
+    public void startJob(DeploymentId deploymentId, JobId jobId, byte[] 
planBytes) throws Exception {
+        CCNCFunctions.StartJobFunction stf = new 
CCNCFunctions.StartJobFunction(deploymentId, jobId, planBytes);
+        ipcHandle.send(-1, stf, null);
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 0d9ff5d..21e252a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -24,6 +24,7 @@
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
 
@@ -97,6 +98,9 @@
 
     private final AtomicLong memoryAllocation;
 
+    private final AtomicBoolean aborted;
+    private volatile Thread taskInitializationThreads;
+
     private JobStatus cleanupStatus;
 
     private boolean cleanupPending;
@@ -128,6 +132,7 @@
         }
         IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory();
         globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
+        aborted = new AtomicBoolean(false);
     }
 
     @Override
@@ -165,6 +170,7 @@
             this.nodeId = nodeId;
         }
 
+        @Override
         public String toString() {
             return super.toString() + "@" + nodeId;
         }
@@ -334,4 +340,20 @@
     public ClassLoader getClassLoader() throws HyracksException {
         return DeploymentUtils.getClassLoader(deploymentId, appCtx);
     }
+
+    public boolean isAborted() {
+        return aborted.get();
+    }
+
+    public synchronized void abort() {
+        if (taskInitializationThreads != null) {
+            taskInitializationThreads.interrupt();
+            taskInitializationThreads = null;
+        }
+        aborted.set(true);
+    }
+
+    public void setTaskInitializationThreads(Thread taskInitializationThreads) 
{
+        this.taskInitializationThreads = taskInitializationThreads;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index d7facf0..ecde636 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -84,11 +84,12 @@
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 import org.apache.hyracks.control.nc.work.DeployBinaryWork;
+import org.apache.hyracks.control.nc.work.NodeThreadDumpWork;
 import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import org.apache.hyracks.control.nc.work.ShutdownWork;
+import org.apache.hyracks.control.nc.work.StartJobWork;
 import org.apache.hyracks.control.nc.work.StartTasksWork;
 import org.apache.hyracks.control.nc.work.StateDumpWork;
-import org.apache.hyracks.control.nc.work.NodeThreadDumpWork;
 import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
@@ -184,8 +185,8 @@
         queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves 
MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<JobId, Joblet>();
         timer = new Timer(true);
-        serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
-                new File(new File(NodeControllerService.class.getName()), id));
+        serverCtx = new 
ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
+                NodeControllerService.class.getName()), id));
         memoryMXBean = ManagementFactory.getMemoryMXBean();
         gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
         threadMXBean = ManagementFactory.getThreadMXBean();
@@ -257,8 +258,8 @@
                 ncConfig.resultPublicPort, 
FullFrameChannelInterfaceFactory.INSTANCE);
         if (ncConfig.messagingIPAddress != null && 
appCtx.getMessagingChannelInterfaceFactory() != null) {
             messagingNetManager = new MessagingNetworkManager(this, 
ncConfig.messagingIPAddress, ncConfig.messagingPort,
-                    ncConfig.nNetThreads, ncConfig.messagingPublicIPAddress, 
ncConfig.messagingPublicPort,
-                    appCtx.getMessagingChannelInterfaceFactory());
+                    ncConfig.nNetThreads, ncConfig.messagingPublicIPAddress, 
ncConfig.messagingPublicPort, appCtx
+                            .getMessagingChannelInterfaceFactory());
         }
     }
 
@@ -288,10 +289,10 @@
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
         NetworkAddress meesagingPort = messagingNetManager != null ? 
messagingNetManager.getPublicNetworkAddress()
                 : null;
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, 
ncConfig, netAddress, datasetAddress,
-                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), 
osMXBean.getAvailableProcessors(),
-                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), 
runtimeMXBean.getVmVendor(),
-                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(),
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, 
ncConfig, netAddress, datasetAddress, osMXBean
+                .getName(), osMXBean.getArch(), osMXBean.getVersion(), 
osMXBean.getAvailableProcessors(), runtimeMXBean
+                        .getVmName(), runtimeMXBean.getVmVersion(), 
runtimeMXBean.getVmVendor(), runtimeMXBean
+                                .getClassPath(), 
runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
                 runtimeMXBean.getInputArguments(), 
runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
                 PidHelper.getPid()));
 
@@ -337,8 +338,8 @@
         if (className != null) {
             Class<?> c = Class.forName(className);
             ncAppEntryPoint = (INCApplicationEntryPoint) c.newInstance();
-            String[] args = ncConfig.appArgs == null ? new String[0]
-                    : ncConfig.appArgs.toArray(new 
String[ncConfig.appArgs.size()]);
+            String[] args = ncConfig.appArgs == null ? new String[0] : 
ncConfig.appArgs.toArray(
+                    new String[ncConfig.appArgs.size()]);
             ncAppEntryPoint.start(appCtx, args);
         }
         executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
@@ -517,14 +518,20 @@
                 case SEND_APPLICATION_MESSAGE:
                     CCNCFunctions.SendApplicationMessageFunction amf =
                             (CCNCFunctions.SendApplicationMessageFunction) fn;
-                    queue.schedule(new 
ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
-                            amf.getDeploymentId(), amf.getNodeId()));
+                    queue.schedule(new 
ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
+                            .getDeploymentId(), amf.getNodeId()));
+                    return;
+
+                case START_JOB:
+                    CCNCFunctions.StartJobFunction sjf = 
(CCNCFunctions.StartJobFunction) fn;
+                    queue.schedule(new 
StartJobWork(NodeControllerService.this, sjf.getDeploymentId(), sjf.getJobId(),
+                            sjf.getPlanBytes()));
                     return;
 
                 case START_TASKS:
                     CCNCFunctions.StartTasksFunction stf = 
(CCNCFunctions.StartTasksFunction) fn;
-                    queue.schedule(new 
StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), 
stf.getJobId(),
-                            stf.getPlanBytes(), stf.getTaskDescriptors(), 
stf.getConnectorPolicies(), stf.getFlags()));
+                    queue.schedule(new 
StartTasksWork(NodeControllerService.this, stf.getJobId(), stf
+                            .getTaskDescriptors(), stf.getConnectorPolicies(), 
stf.getFlags()));
                     return;
 
                 case ABORT_TASKS:
@@ -540,8 +547,8 @@
                 case REPORT_PARTITION_AVAILABILITY:
                     CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
                             
(CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
-                    queue.schedule(new 
ReportPartitionAvailabilityWork(NodeControllerService.this,
-                            rpaf.getPartitionId(), rpaf.getNetworkAddress()));
+                    queue.schedule(new 
ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
+                            .getPartitionId(), rpaf.getNetworkAddress()));
                     return;
 
                 case NODE_REGISTRATION_RESULT:
@@ -557,8 +564,8 @@
 
                 case DEPLOY_BINARY:
                     CCNCFunctions.DeployBinaryFunction dbf = 
(CCNCFunctions.DeployBinaryFunction) fn;
-                    queue.schedule(new 
DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
-                            dbf.getBinaryURLs()));
+                    queue.schedule(new 
DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(), dbf
+                            .getBinaryURLs()));
                     return;
 
                 case UNDEPLOY_BINARY:
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
index 6566655..a8ca58e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
@@ -40,10 +40,17 @@
 
     private final List<TaskAttemptId> tasks;
 
+    private final Joblet ji;
+
     public AbortTasksWork(NodeControllerService ncs, JobId jobId, 
List<TaskAttemptId> tasks) {
         this.ncs = ncs;
         this.jobId = jobId;
         this.tasks = tasks;
+        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+        ji = jobletMap.get(jobId);
+        if (ji != null) {
+            ji.abort();
+        }
     }
 
     @Override
@@ -55,9 +62,6 @@
         if (dpm != null) {
             ncs.getDatasetPartitionManager().abortReader(jobId);
         }
-
-        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
-        Joblet ji = jobletMap.get(jobId);
         if (ji != null) {
             Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
             for (TaskAttemptId taId : tasks) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartJobWork.java
new file mode 100644
index 0000000..de72d1c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartJobWork.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.Joblet;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.application.NCApplicationContext;
+
+public class StartJobWork extends AbstractWork {
+    private static final Logger LOGGER = 
Logger.getLogger(StartTasksWork.class.getName());
+    private final NodeControllerService ncs;
+    private final DeploymentId deploymentId;
+    private final JobId jobId;
+    private final byte[] acgBytes;
+
+    public StartJobWork(NodeControllerService ncs, DeploymentId deploymentId, 
JobId jobId, byte[] acgBytes) {
+        this.ncs = ncs;
+        this.deploymentId = deploymentId;
+        this.jobId = jobId;
+        this.acgBytes = acgBytes;
+    }
+
+    @Override
+    public void run() {
+        try {
+            NCApplicationContext appCtx = ncs.getApplicationContext();
+            Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+            Joblet ji = jobletMap.get(jobId);
+            if (ji != null) {
+                throw new RuntimeException("Job " + jobId + " already exists");
+            }
+            if (acgBytes == null) {
+                throw new NullPointerException("JobActivityGraph was null");
+            }
+            ActivityClusterGraph acg = (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, deploymentId,
+                    appCtx);
+            ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
+            jobletMap.put(jobId, ji);
+        } catch (Exception e) {
+            // notify cc of start job failure
+            LOGGER.log(Level.SEVERE, "Failure during job " + jobId + " 
creation on NC " + ncs.getId(), e);
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index b585199..a760b32 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -28,7 +28,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.IPartitionCollector;
 import org.apache.hyracks.api.comm.IPartitionWriterFactory;
@@ -45,7 +44,6 @@
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
@@ -53,13 +51,12 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.comm.channels.NetworkInputChannel;
-import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.control.common.utils.ExceptionUtils;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.Joblet;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
-import org.apache.hyracks.control.nc.application.NCApplicationContext;
 import org.apache.hyracks.control.nc.partitions.MaterializedPartitionWriter;
 import 
org.apache.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
 import org.apache.hyracks.control.nc.partitions.PipelinedPartition;
@@ -71,11 +68,7 @@
 
     private final NodeControllerService ncs;
 
-    private final DeploymentId deploymentId;
-
     private final JobId jobId;
-
-    private final byte[] acgBytes;
 
     private final List<TaskAttemptDescriptor> taskDescriptors;
 
@@ -83,13 +76,10 @@
 
     private final EnumSet<JobFlag> flags;
 
-    public StartTasksWork(NodeControllerService ncs, DeploymentId 
deploymentId, JobId jobId, byte[] acgBytes,
-            List<TaskAttemptDescriptor> taskDescriptors,
+    public StartTasksWork(NodeControllerService ncs, JobId jobId, 
List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, 
EnumSet<JobFlag> flags) {
         this.ncs = ncs;
-        this.deploymentId = deploymentId;
         this.jobId = jobId;
-        this.acgBytes = acgBytes;
         this.taskDescriptors = taskDescriptors;
         this.connectorPoliciesMap = connectorPoliciesMap;
         this.flags = flags;
@@ -97,9 +87,17 @@
 
     @Override
     public void run() {
+        Joblet joblet = ncs.getJobletMap().get(jobId);
+        if (joblet == null) {
+            LOGGER.log(Level.SEVERE, "Joblet not found for job " + jobId + " 
on Node " + ncs.getId());
+            return;
+        }
+        if (joblet.isAborted()) {
+            return;
+        }
+        Task task = null;
         try {
-            NCApplicationContext appCtx = ncs.getApplicationContext();
-            final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, 
appCtx, acgBytes);
+            joblet.setTaskInitializationThreads(Thread.currentThread());
             final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
 
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@@ -129,8 +127,9 @@
                 }
                 final int partition = tid.getPartition();
                 List<IConnectorDescriptor> inputs = 
ac.getActivityInputMap().get(aid);
-                Task task = new Task(joblet, taId, han.getClass().getName(), 
ncs.getExecutorService(), ncs,
+                task = new Task(joblet, taId, han.getClass().getName(), 
ncs.getExecutorService(), ncs,
                         createInputChannels(td, inputs));
+                // following call could throw an exception
                 IOperatorNodePushable operator = han.createPushRuntime(task, 
rdp, partition, td.getPartitionCount());
 
                 List<IPartitionCollector> collectors = new 
ArrayList<IPartitionCollector>();
@@ -161,47 +160,40 @@
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("output: " + i + ": " + 
conn.getConnectorId());
                         }
-                        IFrameWriter writer = conn.createPartitioner(task, 
recordDesc, pwFactory, partition,
-                                td.getPartitionCount(), 
td.getOutputPartitionCounts()[i]);
+                        IFrameWriter writer = conn.createPartitioner(task, 
recordDesc, pwFactory, partition, td
+                                .getPartitionCount(), 
td.getOutputPartitionCounts()[i]);
                         operator.setOutputFrameWriter(i, writer, recordDesc);
                     }
                 }
 
                 task.setTaskRuntime(collectors.toArray(new 
IPartitionCollector[collectors.size()]), operator);
+                if (joblet.isAborted()) {
+                    return;
+                }
                 joblet.addTask(task);
-
                 task.start();
             }
         } catch (Exception e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-        }
-    }
-
-    private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId 
jobId, INCApplicationContext appCtx,
-            byte[] acgBytes) throws Exception {
-        Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
-        Joblet ji = jobletMap.get(jobId);
-        if (ji == null) {
-            if (acgBytes == null) {
-                throw new NullPointerException("JobActivityGraph was null");
+            LOGGER.log(Level.SEVERE, "Failure starting task", e);
+            // notify cc of start task failure
+            List<Exception> exceptions = new ArrayList<>();
+            ExceptionUtils.setNodeIds(exceptions, ncs.getId());
+            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, 
exceptions));
+        } finally {
+            if (joblet != null) {
+                joblet.setTaskInitializationThreads(null);
             }
-            ActivityClusterGraph acg = (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, deploymentId,
-                    appCtx);
-            ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
-            jobletMap.put(jobId, ji);
         }
-        return ji;
     }
 
     private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor 
td, final int partition, Task task,
             int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, 
IConnectorPolicy cPolicy)
             throws HyracksDataException {
-        IPartitionCollector collector = conn.createPartitionCollector(task, 
recordDesc, partition,
-                td.getInputPartitionCounts()[i], td.getPartitionCount());
+        IPartitionCollector collector = conn.createPartitionCollector(task, 
recordDesc, partition, td
+                .getInputPartitionCounts()[i], td.getPartitionCount());
         if (cPolicy.materializeOnReceiveSide()) {
-            return new ReceiveSideMaterializingCollector(task, 
ncs.getPartitionManager(), collector,
-                    task.getTaskAttemptId(), ncs.getExecutorService());
+            return new ReceiveSideMaterializingCollector(task, 
ncs.getPartitionManager(), collector, task
+                    .getTaskAttemptId(), ncs.getExecutorService());
         } else {
             return collector;
         }
@@ -225,7 +217,8 @@
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) 
throws HyracksDataException {
                         return new MaterializingPipelinedPartition(ctx, 
ncs.getPartitionManager(), new PartitionId(
-                                jobId, conn.getConnectorId(), senderIndex, 
receiverIndex), taId, ncs.getExecutorService());
+                                jobId, conn.getConnectorId(), senderIndex, 
receiverIndex), taId, ncs
+                                        .getExecutorService());
                     }
                 };
             }
@@ -233,8 +226,8 @@
             factory = new IPartitionWriterFactory() {
                 @Override
                 public IFrameWriter createFrameWriter(int receiverIndex) 
throws HyracksDataException {
-                    return new PipelinedPartition(ctx, 
ncs.getPartitionManager(), new PartitionId(jobId,
-                            conn.getConnectorId(), senderIndex, 
receiverIndex), taId);
+                    return new PipelinedPartition(ctx, 
ncs.getPartitionManager(), new PartitionId(jobId, conn
+                            .getConnectorId(), senderIndex, receiverIndex), 
taId);
                 }
             };
         }
@@ -254,8 +247,8 @@
      * @return a list of known channels, one for each connector
      * @throws UnknownHostException
      */
-    private List<List<PartitionChannel>> 
createInputChannels(TaskAttemptDescriptor td, List<IConnectorDescriptor> inputs)
-            throws UnknownHostException {
+    private List<List<PartitionChannel>> 
createInputChannels(TaskAttemptDescriptor td,
+            List<IConnectorDescriptor> inputs) throws UnknownHostException {
         NetworkAddress[][] inputAddresses = td.getInputPartitionLocations();
         List<List<PartitionChannel>> channelsForInputConnectors = new 
ArrayList<List<PartitionChannel>>();
         if (inputAddresses != null) {
@@ -266,8 +259,8 @@
                         NetworkAddress networkAddress = inputAddresses[i][j];
                         PartitionId pid = new PartitionId(jobId, 
inputs.get(i).getConnectorId(), j, td
                                 
.getTaskAttemptId().getTaskId().getPartition());
-                        PartitionChannel channel = new PartitionChannel(pid, 
new NetworkInputChannel(
-                                ncs.getNetworkManager(), new 
InetSocketAddress(InetAddress.getByAddress(networkAddress
+                        PartitionChannel channel = new PartitionChannel(pid, 
new NetworkInputChannel(ncs
+                                .getNetworkManager(), new 
InetSocketAddress(InetAddress.getByAddress(networkAddress
                                         .lookupIpAddress()), 
networkAddress.getPort()), pid, 5));
                         channels.add(channel);
                     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 9685837..72569f3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -27,12 +27,6 @@
 import java.util.logging.Logger;
 
 import org.apache.commons.io.FileUtils;
-import org.json.JSONArray;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -51,6 +45,11 @@
 import org.apache.hyracks.control.nc.resources.memory.FrameManager;
 import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.json.JSONArray;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
 
 public abstract class AbstractMultiNCIntegrationTest {
 
@@ -71,8 +70,7 @@
     public TemporaryFolder outputFolder = new TemporaryFolder();
 
     public AbstractMultiNCIntegrationTest() {
-        outputFiles = new ArrayList<File>();
-        ;
+        outputFiles = new ArrayList<File>();;
     }
 
     @BeforeClass
@@ -135,37 +133,38 @@
 
         IFrameTupleAccessor frameTupleAccessor = new 
ResultFrameTupleAccessor();
 
-        IHyracksDataset hyracksDataset = new HyracksDataset(hcc, 
spec.getFrameSize(), nReaders);
-        IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, 
spec.getResultSetIds().get(0));
+        if (!spec.getResultSetIds().isEmpty()) {
+            IHyracksDataset hyracksDataset = new HyracksDataset(hcc, 
spec.getFrameSize(), nReaders);
+            IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, 
spec.getResultSetIds().get(0));
 
-        JSONArray resultRecords = new JSONArray();
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
+            JSONArray resultRecords = new JSONArray();
+            ByteBufferInputStream bbis = new ByteBufferInputStream();
 
-        int readSize = reader.read(resultFrame);
+            int readSize = reader.read(resultFrame);
 
-        while (readSize > 0) {
+            while (readSize > 0) {
 
-            try {
-                frameTupleAccessor.reset(resultFrame.getBuffer());
-                for (int tIndex = 0; tIndex < 
frameTupleAccessor.getTupleCount(); tIndex++) {
-                    int start = frameTupleAccessor.getTupleStartOffset(tIndex);
-                    int length = frameTupleAccessor.getTupleEndOffset(tIndex) 
- start;
-                    bbis.setByteBuffer(resultFrame.getBuffer(), start);
-                    byte[] recordBytes = new byte[length];
-                    bbis.read(recordBytes, 0, length);
-                    resultRecords.put(new String(recordBytes, 0, length));
-                }
-            } finally {
                 try {
-                    bbis.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
+                    frameTupleAccessor.reset(resultFrame.getBuffer());
+                    for (int tIndex = 0; tIndex < 
frameTupleAccessor.getTupleCount(); tIndex++) {
+                        int start = 
frameTupleAccessor.getTupleStartOffset(tIndex);
+                        int length = 
frameTupleAccessor.getTupleEndOffset(tIndex) - start;
+                        bbis.setByteBuffer(resultFrame.getBuffer(), start);
+                        byte[] recordBytes = new byte[length];
+                        bbis.read(recordBytes, 0, length);
+                        resultRecords.put(new String(recordBytes, 0, length));
+                    }
+                } finally {
+                    try {
+                        bbis.close();
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
                 }
+
+                readSize = reader.read(resultFrame);
             }
-
-            readSize = reader.read(resultFrame);
         }
-
         hcc.waitForCompletion(jobId);
         dumpOutputFiles();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
new file mode 100644
index 0000000..5646bbe
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
+import 
org.apache.hyracks.tests.util.ExceptionOnCreatePushRuntimeOperatorDescriptor;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JobFailureTest extends AbstractMultiNCIntegrationTest {
+
+    @Test
+    public void failureOnCreatePushRuntime() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        AbstractSingleActivityOperatorDescriptor sourceOpDesc = new 
ExceptionOnCreatePushRuntimeOperatorDescriptor(spec,
+                0, 1, new int[] { 4 }, Integer.MAX_VALUE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
sourceOpDesc, ASTERIX_IDS);
+        SinkOperatorDescriptor sinkOpDesc = new SinkOperatorDescriptor(spec, 
1);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
sinkOpDesc, ASTERIX_IDS);
+        IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
+        spec.addRoot(sinkOpDesc);
+        try {
+            runTest(spec);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+        
Assert.assertTrue(ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed());
+        // should also check the content of the different ncs
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
new file mode 100644
index 0000000..9f0a6d9
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private static AtomicInteger createPushRuntime = new AtomicInteger();
+    private static AtomicInteger initializeCounter = new AtomicInteger();
+    private static AtomicInteger openCloseCounter = new AtomicInteger();
+    private final int[] exceptionPartitions;
+    private final int duration;
+
+    public 
ExceptionOnCreatePushRuntimeOperatorDescriptor(IOperatorDescriptorRegistry 
spec, int inputArity,
+            int outputArity, int[] exceptionPartitions, int duration) {
+        super(spec, inputArity, outputArity);
+        this.exceptionPartitions = exceptionPartitions;
+        this.duration = duration;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
+        createPushRuntime.incrementAndGet();
+        try {
+            if (exceptionPartitions != null) {
+                for (int p : exceptionPartitions) {
+                    if (p == partition) {
+                        throw new HyracksDataException("I throw exceptions");
+                    }
+                }
+            }
+            return new IOperatorNodePushable() {
+                @Override
+                public void setOutputFrameWriter(int index, IFrameWriter 
writer, RecordDescriptor recordDesc)
+                        throws HyracksDataException {
+                }
+
+                @Override
+                public void initialize() throws HyracksDataException {
+                    initializeCounter.incrementAndGet();
+                    if (duration > 0) {
+                        try {
+                            Thread.sleep(duration);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                }
+
+                @Override
+                public IFrameWriter getInputFrameWriter(int index) {
+                    return new IFrameWriter() {
+                        @Override
+                        public void open() throws HyracksDataException {
+                            openCloseCounter.incrementAndGet();
+                        }
+
+                        @Override
+                        public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                        }
+
+                        @Override
+                        public void fail() throws HyracksDataException {
+                        }
+
+                        @Override
+                        public void close() throws HyracksDataException {
+                            openCloseCounter.decrementAndGet();
+                        }
+                    };
+                }
+
+                @Override
+                public int getInputArity() {
+                    return inputArity;
+                }
+
+                @Override
+                public String getDisplayName() {
+                    return 
ExceptionOnCreatePushRuntimeOperatorDescriptor.class.getSimpleName()
+                            + ".OperatorNodePushable:" + partition;
+                }
+
+                @Override
+                public void deinitialize() throws HyracksDataException {
+                    initializeCounter.decrementAndGet();
+                }
+            };
+        } finally {
+            createPushRuntime.decrementAndGet();
+        }
+    }
+
+    public static boolean succeed() {
+        boolean success = openCloseCounter.get() == 0 && 
createPushRuntime.get() == 0 && initializeCounter.get() == 0;
+        if (!success) {
+            System.err.println("Failure:");
+            System.err.println("CreatePushRuntime:" + createPushRuntime.get());
+            System.err.println("InitializeCounter:" + initializeCounter.get());
+            System.err.println("OpenCloseCounter:" + openCloseCounter.get());
+        }
+        return success;
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1197
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2ec2c798b704ca426d5937f22e6d2bd394a9095a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to