Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1424

Change subject: WIP: concurrent query management support.
......................................................................

WIP: concurrent query management support.

Change-Id: I8fb6fda57efa139114dd234e08cc7de7129468c8
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IComputationResource.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobResourceController.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobResourceController.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/MemoryOnlyJobResourceController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
R 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobExecutor.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
28 files changed, 518 insertions(+), 267 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/24/1424/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 00c2cc4..9a908ab 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -75,10 +75,8 @@
         acg.setFrameSize(spec.getFrameSize());
         acg.setMaxReattempts(spec.getMaxReattempts());
         
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
-        acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
         
acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
         
acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
-        acg.setReportTaskDetails(spec.isReportTaskDetails());
         final Set<Constraint> constraints = new HashSet<Constraint>();
         final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
             @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IComputationResource.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IComputationResource.java
new file mode 100644
index 0000000..deaf07e
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IComputationResource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job;
+
+public interface IComputationResource {
+
+    long DENY_ANYTHING_RESOURCE_COUNT = -1;
+
+    default long getAggregatedMemoryByteSize() {
+        return DENY_ANYTHING_RESOURCE_COUNT;
+    }
+
+    default long getAggregatedAvailableCores() {
+        return DENY_ANYTHING_RESOURCE_COUNT;
+    }
+
+    default long getMemoryByteSize(String nodeId) {
+        return DENY_ANYTHING_RESOURCE_COUNT;
+    }
+
+    default long getAvailableCores(String nodeId) {
+        return DENY_ANYTHING_RESOURCE_COUNT;
+    }
+
+    void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize);
+
+    void setAggregaedAvailableCores(int aggregatedAvailableCores);
+
+    void setMemoryByteSize(String nodeId, long memoryByteSize);
+
+    void setAvailableCores(String nodeId, int availableCores);
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 84a961e..40287b7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -28,7 +28,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -44,6 +43,8 @@
 import 
org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.resource.DefaultJobResourceController;
+import org.apache.hyracks.api.job.resource.IJobResourceController;
 
 public class JobSpecification implements Serializable, 
IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
     private static final long serialVersionUID = 1L;
@@ -76,11 +77,9 @@
 
     private IJobletEventListenerFactory jobletEventListenerFactory;
 
-    private IGlobalJobDataFactory globalJobDataFactory;
-
     private boolean useConnectorPolicyForScheduling;
 
-    private boolean reportTaskDetails;
+    private IJobResourceController jobResourceConstraint;
 
     private transient int operatorIdCounter;
 
@@ -106,7 +105,7 @@
         connectorIdCounter = 0;
         maxReattempts = 2;
         useConnectorPolicyForScheduling = false;
-        reportTaskDetails = true;
+        jobResourceConstraint = DefaultJobResourceController.INSTANCE;
         setFrameSize(frameSize);
     }
 
@@ -281,14 +280,6 @@
         this.jobletEventListenerFactory = jobletEventListenerFactory;
     }
 
-    public IGlobalJobDataFactory getGlobalJobDataFactory() {
-        return globalJobDataFactory;
-    }
-
-    public void setGlobalJobDataFactory(IGlobalJobDataFactory 
globalJobDataFactory) {
-        this.globalJobDataFactory = globalJobDataFactory;
-    }
-
     public boolean isUseConnectorPolicyForScheduling() {
         return useConnectorPolicyForScheduling;
     }
@@ -297,12 +288,12 @@
         this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
     }
 
-    public boolean isReportTaskDetails() {
-        return reportTaskDetails;
+    public void setJobResourceController(IJobResourceController 
jobResourceConstraint) {
+        this.jobResourceConstraint = jobResourceConstraint;
     }
 
-    public void setReportTaskDetails(boolean reportTaskDetails) {
-        this.reportTaskDetails = reportTaskDetails;
+    public IJobResourceController getJobResourceController() {
+        return jobResourceConstraint;
     }
 
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int 
index, V value) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
index 4351e39..177675b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
@@ -20,6 +20,7 @@
 
 public enum JobStatus {
     INITIALIZED,
+    WAITING,
     RUNNING,
     TERMINATED,
     FAILURE,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobResourceController.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobResourceController.java
new file mode 100644
index 0000000..ef51617
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobResourceController.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.job.IComputationResource;
+
+public class DefaultJobResourceController implements IJobResourceController {
+
+    public static DefaultJobResourceController INSTANCE = new 
DefaultJobResourceController();
+
+    private DefaultJobResourceController() {
+    }
+
+    @Override
+    public boolean allocate(IComputationResource inputComputationResource) {
+        return true;
+    }
+
+    @Override
+    public void release(IComputationResource inputComputationResource) {
+
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobResourceController.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobResourceController.java
new file mode 100644
index 0000000..95c078d
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobResourceController.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.job.IComputationResource;
+
+public interface IJobResourceController extends Serializable {
+
+    boolean allocate(IComputationResource inputComputationResource);
+
+    void release(IComputationResource inputComputationResource);
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/MemoryOnlyJobResourceController.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/MemoryOnlyJobResourceController.java
new file mode 100644
index 0000000..521ce3e
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/MemoryOnlyJobResourceController.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.job.IComputationResource;
+
+public class MemoryOnlyJobResourceController implements IJobResourceController 
{
+
+    private final long aggregatedRAMRequirement;
+
+    public MemoryOnlyJobResourceController(long aggregatedRAMRequirement) {
+        this.aggregatedRAMRequirement = aggregatedRAMRequirement;
+    }
+
+    @Override
+    public boolean allocate(IComputationResource inputComputationResource) {
+        return true;
+    }
+
+    @Override
+    public void release(IComputationResource inputComputationResource) {
+
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 5fdcede..e41d8dc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -50,6 +50,7 @@
 import org.apache.hyracks.control.cc.application.CCApplicationContext;
 import org.apache.hyracks.control.cc.dataset.DatasetDirectoryService;
 import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.web.WebServer;
 import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
@@ -95,12 +96,6 @@
 
     private CCApplicationContext appCtx;
 
-    private final Map<JobId, JobRun> activeRunMap;
-
-    private final Map<JobId, JobRun> runMapArchive;
-
-    private final Map<JobId, List<Exception>> runMapHistory;
-
     private final WorkQueue workQueue;
 
     private ExecutorService executor;
@@ -118,6 +113,8 @@
     private final Map<String, StateDumpRun> stateDumpRunMap;
 
     private final Map<String, ThreadDumpRun> threadDumpRunMap;
+
+    private final JobManager jobManager;
 
     private ShutdownRun shutdownCallback;
 
@@ -137,25 +134,7 @@
         clientIPC = new IPCSystem(new 
InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
                 new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this);
-        activeRunMap = new HashMap<>();
-        runMapArchive = new LinkedHashMap<JobId, JobRun>() {
-            private static final long serialVersionUID = 1L;
 
-            @Override
-            protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> 
eldest) {
-                return size() > ccConfig.jobHistorySize;
-            }
-        };
-        runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
-            private static final long serialVersionUID = 1L;
-            /** history size + 1 is for the case when history size = 0 */
-            private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
-
-            @Override
-            protected boolean removeEldestEntry(Map.Entry<JobId, 
List<Exception>> eldest) {
-                return size() > allowedSize;
-            }
-        };
         // WorkQueue is in charge of heartbeat as well as other events.
         workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY);
         this.timer = new Timer(true);
@@ -167,6 +146,9 @@
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
         threadDumpRunMap = Collections.synchronizedMap(new HashMap<>());
+
+        // Job manager is in charge of job lifecycle management.
+        jobManager = new JobManager(ccConfig, this);
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) 
throws Exception {
@@ -301,16 +283,8 @@
         return ccContext;
     }
 
-    public Map<JobId, JobRun> getActiveRunMap() {
-        return activeRunMap;
-    }
-
-    public Map<JobId, JobRun> getRunMapArchive() {
-        return runMapArchive;
-    }
-
-    public Map<JobId, List<Exception>> getRunHistory() {
-        return runMapHistory;
+    public JobManager getJobManager() {
+        return jobManager;
     }
 
     public Map<InetAddress, Set<String>> getIpAddressNodeNameMap() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
new file mode 100644
index 0000000..f788d7b
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.job;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.IComputationResource;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobResourceController;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.work.JobCleanupWork;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.work.IResultCallback;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class JobManager {
+
+    private static final Logger LOGGER = 
Logger.getLogger(JobManager.class.getName());
+
+    private final ClusterControllerService ccs;
+    private final Queue<JobRun> jobQueue;
+    private final Map<JobId, JobRun> activeRunMap;
+    private final Map<JobId, JobRun> runMapArchive;
+    private final Map<JobId, List<Exception>> runMapHistory;
+    private final IComputationResource computationResource = null;
+
+    public JobManager(CCConfig ccConfig, ClusterControllerService ccs) {
+        this.ccs = ccs;
+        jobQueue = new ArrayDeque<>();
+        activeRunMap = new HashMap<>();
+        runMapArchive = new LinkedHashMap<JobId, JobRun>() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> 
eldest) {
+                return size() > ccConfig.jobHistorySize;
+            }
+        };
+        runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+            private static final long serialVersionUID = 1L;
+            /** history size + 1 is for the case when history size = 0 */
+            private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<JobId, 
List<Exception>> eldest) {
+                return size() > allowedSize;
+            }
+        };
+    }
+
+    public Collection<JobRun> getAllRunningJobs() {
+        return activeRunMap.values();
+    }
+
+    public Collection<JobRun> getArchivedJobs() {
+        return runMapArchive.values();
+    }
+
+    public JobRun getJob(JobId jobId) {
+        JobRun jobRun = activeRunMap.get(jobId);
+        if (jobRun == null) {
+            jobRun = runMapArchive.get(jobId);
+        }
+        return jobRun;
+    }
+
+    public void addJob(JobRun jobRun) {
+        IJobResourceController resourceController = 
jobRun.getActivityClusterGraphFactory().getJobSpecification()
+                .getJobResourceController();
+        if (resourceController.allocate(computationResource)) {
+            executeJob(jobRun);
+        } else {
+            jobRun.setStatus(JobStatus.WAITING, null);
+            jobQueue.add(jobRun);
+        }
+    }
+
+    public void completeJob(JobRun run) {
+        JobId jobId = run.getJobId();
+        CCApplicationContext appCtx = ccs.getApplicationContext();
+        if (appCtx != null) {
+            try {
+                appCtx.notifyJobFinish(jobId);
+            } catch (HyracksException e) {
+                e.printStackTrace();
+            }
+        }
+        run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+        run.setEndTime(System.currentTimeMillis());
+        activeRunMap.remove(jobId);
+        runMapArchive.put(jobId, run);
+        runMapHistory.put(jobId, run.getExceptions());
+
+        if (run.getActivityClusterGraph().isReportTaskDetails()) {
+            /**
+             * log job details when profiling is enabled
+             */
+            try {
+                ccs.getJobLogFile().log(createJobLogObject(run));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        // Releases computation resources occupied by the job.
+        IJobResourceController resourceController = 
run.getActivityClusterGraphFactory().getJobSpecification()
+                .getJobResourceController();
+        resourceController.release(computationResource);
+
+        // Picks the next job to execute.
+        pickNextJobToRun();
+    }
+
+    public void terminateJob(JobId jobId, JobStatus status, List<Exception> 
exceptions) {
+        JobRun run = getJob(jobId);
+        if (run == null) {
+            LOGGER.warning("Unable to find JobRun with id: " + jobId);
+            return;
+        }
+        if (run.getPendingStatus() != null && 
run.getCleanupPendingNodeIds().isEmpty()) {
+            completeJob(run);
+            return;
+        }
+        if (run.getPendingStatus() != null) {
+            LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " + 
jobId);
+            return;
+        }
+        Set<String> targetNodes = run.getParticipatingNodeIds();
+        run.getCleanupPendingNodeIds().addAll(targetNodes);
+        if (run.getPendingStatus() != JobStatus.FAILURE && 
run.getPendingStatus() != JobStatus.TERMINATED) {
+            run.setPendingStatus(status, exceptions);
+        }
+        if (targetNodes != null && !targetNodes.isEmpty()) {
+            Set<String> toDelete = new HashSet<>();
+            for (String n : targetNodes) {
+                NodeControllerState ncs = ccs.getNodeMap().get(n);
+                try {
+                    if (ncs == null) {
+                        toDelete.add(n);
+                    } else {
+                        ncs.getNodeController().cleanUpJoblet(jobId, status);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+            targetNodes.removeAll(toDelete);
+            run.getCleanupPendingNodeIds().removeAll(toDelete);
+            if (run.getCleanupPendingNodeIds().isEmpty()) {
+                completeJob(run);
+            }
+        } else {
+            completeJob(run);
+        }
+    }
+
+    public List<Exception> getRunHistory(JobId jobId) {
+        List<Exception> exceptions = runMapHistory.get(jobId);
+        return exceptions;
+    }
+
+    private void pickNextJobToRun() {
+        for (JobRun run : jobQueue) {
+            IJobResourceController resourceController = 
run.getActivityClusterGraphFactory().getJobSpecification()
+                    .getJobResourceController();
+            if (resourceController.allocate(computationResource)) {
+                executeJob(run);
+            }
+        }
+    }
+
+    // Executes a job when the resource for the job is available.
+    private void executeJob(JobRun run) {
+        IResultCallback<JobId> callback = run.getCallback();
+        try {
+            //run.setStatus(JobStatus.INITIALIZED, null);
+            run.setStartTime(System.currentTimeMillis());
+            JobId jobId = run.getJobId();
+            activeRunMap.put(jobId, run);
+
+            CCApplicationContext appCtx = ccs.getApplicationContext();
+            IActivityClusterGraphGeneratorFactory acggf = 
run.getActivityClusterGraphFactory();
+            appCtx.notifyJobCreation(jobId, acggf);
+            run.setStatus(JobStatus.RUNNING, null);
+            try {
+                run.getExecutor().startJob();
+            } catch (Exception e) {
+                ccs.getWorkQueue().schedule(
+                        new JobCleanupWork(ccs, run.getJobId(), 
JobStatus.FAILURE, Collections.singletonList(e)));
+            }
+            callback.setValue(jobId);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+
+    private ObjectNode createJobLogObject(final JobRun run) {
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode jobLogObject = om.createObjectNode();
+        ActivityClusterGraph acg = run.getActivityClusterGraph();
+        jobLogObject.set("activity-cluster-graph", acg.toJSON());
+        jobLogObject.set("job-run", run.toJSON());
+        return jobLogObject;
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index f1d04bb..7e422e4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,7 +20,6 @@
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -42,6 +41,7 @@
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.ActivityClusterId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
@@ -49,20 +49,23 @@
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
 import org.apache.hyracks.control.cc.scheduler.ActivityPartitionDetails;
-import org.apache.hyracks.control.cc.scheduler.JobScheduler;
+import org.apache.hyracks.control.cc.scheduler.JobExecutor;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.utils.ExceptionUtils;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 public class JobRun implements IJobStatusConditionVariable {
     private final DeploymentId deploymentId;
 
     private final JobId jobId;
 
+    private final IActivityClusterGraphGeneratorFactory acggf;
+
     private final IActivityClusterGraphGenerator acgg;
 
     private final ActivityClusterGraph acg;
 
-    private final JobScheduler scheduler;
+    private final JobExecutor scheduler;
 
     private final EnumSet<JobFlag> jobFlags;
 
@@ -94,21 +97,26 @@
 
     private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
 
+    private final IResultCallback<JobId> callback;
+
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, 
JobId jobId,
-            IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+            IActivityClusterGraphGeneratorFactory acggf, 
IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags,
+            IResultCallback<JobId> callback) {
         this.deploymentId = deploymentId;
         this.jobId = jobId;
+        this.acggf = acggf;
         this.acgg = acgg;
         this.acg = acgg.initialize();
-        this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
+        this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints());
         this.jobFlags = jobFlags;
-        activityClusterPlanMap = new HashMap<ActivityClusterId, 
ActivityClusterPlan>();
+        this.callback = callback;
+        activityClusterPlanMap = new HashMap<>();
         pmm = new PartitionMatchMaker();
-        participatingNodeIds = new HashSet<String>();
-        cleanupPendingNodeIds = new HashSet<String>();
+        participatingNodeIds = new HashSet<>();
+        cleanupPendingNodeIds = new HashSet<>();
         profile = new JobProfile(jobId);
-        connectorPolicyMap = new HashMap<ConnectorDescriptorId, 
IConnectorPolicy>();
-        operatorLocations = new HashMap<OperatorDescriptorId, Map<Integer, 
String>>();
+        connectorPolicyMap = new HashMap<>();
+        operatorLocations = new HashMap<>();
         createTime = System.currentTimeMillis();
     }
 
@@ -118,6 +126,10 @@
 
     public JobId getJobId() {
         return jobId;
+    }
+
+    public IActivityClusterGraphGeneratorFactory 
getActivityClusterGraphFactory() {
+        return acggf;
     }
 
     public ActivityClusterGraph getActivityClusterGraph() {
@@ -167,8 +179,8 @@
         return createTime;
     }
 
-    public void setCreateTime(long createTime) {
-        this.createTime = createTime;
+    public IResultCallback<JobId> getCallback() {
+        return callback;
     }
 
     public long getStartTime() {
@@ -228,7 +240,7 @@
         return profile;
     }
 
-    public JobScheduler getScheduler() {
+    public JobExecutor getExecutor() {
         return scheduler;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index c13a458..1faa9c8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -55,11 +55,11 @@
 public class ActivityClusterPlanner {
     private static final Logger LOGGER = 
Logger.getLogger(ActivityClusterPlanner.class.getName());
 
-    private final JobScheduler scheduler;
+    private final JobExecutor scheduler;
 
     private final Map<PartitionId, TaskCluster> 
partitionProducingTaskClusterMap;
 
-    public ActivityClusterPlanner(JobScheduler newJobScheduler) {
+    public ActivityClusterPlanner(JobExecutor newJobScheduler) {
         this.scheduler = newJobScheduler;
         partitionProducingTaskClusterMap = new HashMap<PartitionId, 
TaskCluster>();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobExecutor.java
similarity index 94%
rename from 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
rename to 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobExecutor.java
index b577ff7..7ac4261 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobExecutor.java
@@ -31,8 +31,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.constraints.Constraint;
 import 
org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
@@ -54,7 +53,6 @@
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
 import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.Task;
@@ -66,8 +64,8 @@
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 
-public class JobScheduler {
-    private static final Logger LOGGER = 
Logger.getLogger(JobScheduler.class.getName());
+public class JobExecutor {
+    private static final Logger LOGGER = 
Logger.getLogger(JobExecutor.class.getName());
 
     private final ClusterControllerService ccs;
 
@@ -80,7 +78,7 @@
     private final Set<TaskCluster> inProgressTaskClusters;
 
 
-    public JobScheduler(ClusterControllerService ccs, JobRun jobRun, 
Collection<Constraint> constraints) {
+    public JobExecutor(ClusterControllerService ccs, JobRun jobRun, 
Collection<Constraint> constraints) {
         this.ccs = ccs;
         this.jobRun = jobRun;
         solver = new PartitionConstraintSolver();
@@ -666,7 +664,8 @@
             jobRun.getParticipatingNodeIds().removeAll(deadNodes);
             jobRun.getCleanupPendingNodeIds().removeAll(deadNodes);
             if (jobRun.getPendingStatus() != null && 
jobRun.getCleanupPendingNodeIds().isEmpty()) {
-                finishJob(jobRun);
+                JobManager jobManager = ccs.getJobManager();
+                jobManager.completeJob(jobRun);
                 return;
             }
             for (ActivityCluster ac : 
jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
@@ -706,40 +705,4 @@
         }
     }
 
-    private void finishJob(final JobRun run) {
-        JobId jobId = run.getJobId();
-        CCApplicationContext appCtx = ccs.getApplicationContext();
-        if (appCtx != null) {
-            try {
-                appCtx.notifyJobFinish(jobId);
-            } catch (HyracksException e) {
-                e.printStackTrace();
-            }
-        }
-        run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
-        run.setEndTime(System.currentTimeMillis());
-        ccs.getActiveRunMap().remove(jobId);
-        ccs.getRunMapArchive().put(jobId, run);
-        ccs.getRunHistory().put(jobId, run.getExceptions());
-
-        if (run.getActivityClusterGraph().isReportTaskDetails()) {
-            /**
-             * log job details when task-profiling is enabled
-             */
-            try {
-                ccs.getJobLogFile().log(createJobLogObject(run));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private ObjectNode createJobLogObject(final JobRun run) {
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode jobLogObject = om.createObjectNode();
-        ActivityClusterGraph acg = run.getActivityClusterGraph();
-        jobLogObject.set("activity-cluster-graph", acg.toJSON());
-        jobLogObject.set("job-run", run.toJSON());
-        return jobLogObject;
-    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index 9134a91..b78c3e2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.ActivityPlan;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.Task;
 import org.apache.hyracks.control.cc.job.TaskAttempt;
@@ -50,7 +51,8 @@
 
     @Override
     public final void runWork() {
-        JobRun run = ccs.getActiveRunMap().get(jobId);
+        JobManager jobManager = ccs.getJobManager();
+        JobRun run = jobManager.getJob(jobId);
         if (run != null) {
             TaskId tid = taId.getTaskId();
             Map<ActivityId, ActivityCluster> activityClusterMap = 
run.getActivityClusterGraph().getActivityMap();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
index 294ae97..64882e0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
@@ -18,13 +18,14 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.control.cc.job.JobManager;
 
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.JobRun;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public class GetActivityClusterGraphJSONWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
@@ -38,15 +39,12 @@
 
     @Override
     protected void doRun() throws Exception {
-
+        JobManager jobManager = ccs.getJobManager();
         ObjectMapper om = new ObjectMapper();
-        JobRun run = ccs.getActiveRunMap().get(jobId);
+        JobRun run = jobManager.getJob(jobId);
         if (run == null) {
-            run = ccs.getRunMapArchive().get(jobId);
-            if (run == null) {
-                json = om.createObjectNode();
-                return;
-            }
+            json = om.createObjectNode();
+            return;
         }
         json = run.getActivityClusterGraph().toJSON();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
index e072c21..f54a5b2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobInfo;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -39,10 +40,8 @@
     @Override
     protected void doRun() throws Exception {
         try {
-            JobRun run = ccs.getActiveRunMap().get(jobId);
-            if (run == null) {
-                run = ccs.getRunMapArchive().get(jobId);
-            }
+            JobManager jobManager = ccs.getJobManager();
+            JobRun run = jobManager.getJob(jobId);
             JobInfo info = (run != null) ? new JobInfo(run.getJobId(), 
run.getStatus(), run.getOperatorLocations())
                     : null;
             callback.setValue(info);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
index aad6edf..40fee46 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
@@ -18,13 +18,14 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class GetJobRunJSONWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
@@ -38,14 +39,12 @@
 
     @Override
     protected void doRun() throws Exception {
+        JobManager jobManager = ccs.getJobManager();
         ObjectMapper om = new ObjectMapper();
-        JobRun run = ccs.getActiveRunMap().get(jobId);
+        JobRun run = jobManager.getJob(jobId);
         if (run == null) {
-            run = ccs.getRunMapArchive().get(jobId);
-            if (run == null) {
-                json = om.createObjectNode();
-                return;
-            }
+            json = om.createObjectNode();
+            return;
         }
         json = run.toJSON();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
index d45a9cc..72ff27e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -39,10 +40,8 @@
     @Override
     protected void doRun() throws Exception {
         try {
-            JobRun run = ccs.getActiveRunMap().get(jobId);
-            if (run == null) {
-                run = ccs.getRunMapArchive().get(jobId);
-            }
+            JobManager jobManager = ccs.getJobManager();
+            JobRun run = jobManager.getJob(jobId);
             JobStatus status = run == null ? null : run.getStatus();
             callback.setValue(status);
         } catch (Exception e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
index 1e5a3a5..681ee39 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
@@ -23,6 +23,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -39,8 +40,9 @@
     protected void doRun() throws Exception {
         ObjectMapper om = new ObjectMapper();
         summaries = om.createArrayNode();
-        populateJSON(ccs.getActiveRunMap().values());
-        populateJSON(ccs.getRunMapArchive().values());
+        JobManager jobManager = ccs.getJobManager();
+        populateJSON(jobManager.getAllRunningJobs());
+        populateJSON(jobManager.getArchivedJobs());
     }
 
     private void populateJSON(Collection<JobRun> jobRuns)  {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 2a383b6..b8f5454 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -18,21 +18,13 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.logging.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
-import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.common.work.AbstractWork;
 
 public class JobCleanupWork extends AbstractWork {
@@ -53,82 +45,8 @@
     @Override
     public void run() {
         LOGGER.info("Cleanup for JobRun with id: " + jobId);
-        final JobRun run = ccs.getActiveRunMap().get(jobId);
-        if (run == null) {
-            LOGGER.warning("Unable to find JobRun with id: " + jobId);
-            return;
-        }
-        if (run.getPendingStatus() != null && 
run.getCleanupPendingNodeIds().isEmpty()) {
-            finishJob(run);
-            return;
-        }
-        if (run.getPendingStatus() != null) {
-            LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " + 
jobId);
-            return;
-        }
-        Set<String> targetNodes = run.getParticipatingNodeIds();
-        run.getCleanupPendingNodeIds().addAll(targetNodes);
-        if (run.getPendingStatus() != JobStatus.FAILURE && 
run.getPendingStatus() != JobStatus.TERMINATED) {
-            run.setPendingStatus(status, exceptions);
-        }
-        if (targetNodes != null && !targetNodes.isEmpty()) {
-            Set<String> toDelete = new HashSet<String>();
-            for (String n : targetNodes) {
-                NodeControllerState ncs = ccs.getNodeMap().get(n);
-                try {
-                    if (ncs == null) {
-                        toDelete.add(n);
-                    } else {
-                        ncs.getNodeController().cleanUpJoblet(jobId, status);
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            targetNodes.removeAll(toDelete);
-            run.getCleanupPendingNodeIds().removeAll(toDelete);
-            if (run.getCleanupPendingNodeIds().isEmpty()) {
-                finishJob(run);
-            }
-        } else {
-            finishJob(run);
-        }
-    }
-
-    private void finishJob(final JobRun run) {
-        CCApplicationContext appCtx = ccs.getApplicationContext();
-        if (appCtx != null) {
-            try {
-                appCtx.notifyJobFinish(jobId);
-            } catch (HyracksException e) {
-                e.printStackTrace();
-            }
-        }
-        run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
-        run.setEndTime(System.currentTimeMillis());
-        ccs.getActiveRunMap().remove(jobId);
-        ccs.getRunMapArchive().put(jobId, run);
-        ccs.getRunHistory().put(jobId, run.getExceptions());
-
-        if (run.getActivityClusterGraph().isReportTaskDetails()) {
-            /**
-             * log job details when profiling is enabled
-             */
-            try {
-                ccs.getJobLogFile().log(createJobLogObject(run));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private ObjectNode createJobLogObject(final JobRun run) {
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode jobLogObject = om.createObjectNode();
-        ActivityClusterGraph acg = run.getActivityClusterGraph();
-        jobLogObject.set("activity-cluster-graph", acg.toJSON());
-        jobLogObject.set("job-run", run.toJSON());
-        return jobLogObject;
+        JobManager jobManager = ccs.getJobManager();
+        jobManager.terminateJob(jobId, status, exceptions);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index e7844e9..df9315f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.work.IResultCallback;
@@ -59,19 +60,9 @@
             IActivityClusterGraphGeneratorFactory acggf = 
(IActivityClusterGraphGeneratorFactory) DeploymentUtils
                     .deserialize(acggfBytes, deploymentId, appCtx);
             IActivityClusterGraphGenerator acgg = 
acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
-            JobRun run = new JobRun(ccs, deploymentId, jobId, acgg, jobFlags);
-            run.setStatus(JobStatus.INITIALIZED, null);
-            run.setStartTime(System.currentTimeMillis());
-            ccs.getActiveRunMap().put(jobId, run);
-            appCtx.notifyJobCreation(jobId, acggf);
-            run.setStatus(JobStatus.RUNNING, null);
-            try {
-                run.getScheduler().startJob();
-            } catch (Exception e) {
-                ccs.getWorkQueue().schedule(
-                        new JobCleanupWork(ccs, run.getJobId(), 
JobStatus.FAILURE, Collections.singletonList(e)));
-            }
-            callback.setValue(jobId);
+            JobRun run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, 
jobFlags, callback);
+            JobManager jobManager = ccs.getJobManager();
+            jobManager.addJob(run);
         } catch (Exception e) {
             callback.setException(e);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 603b6f8..52e4960 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -22,11 +22,10 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 
 public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
@@ -45,7 +44,8 @@
 
     @Override
     public void runWork() {
-        final JobRun run = ccs.getActiveRunMap().get(jobId);
+        JobManager jobManager = ccs.getJobManager();
+        final JobRun run = jobManager.getJob(jobId);
         Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
         if (!cleanupPendingNodes.remove(nodeId)) {
             if (LOGGER.isLoggable(Level.WARNING)) {
@@ -59,19 +59,7 @@
             ncs.getActiveJobIds().remove(jobId);
         }
         if (cleanupPendingNodes.isEmpty()) {
-            CCApplicationContext appCtx = ccs.getApplicationContext();
-            if (appCtx != null) {
-                try {
-                    appCtx.notifyJobFinish(jobId);
-                } catch (HyracksException e) {
-                    e.printStackTrace();
-                }
-            }
-            run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
-            run.setEndTime(System.currentTimeMillis());
-            ccs.getActiveRunMap().remove(jobId);
-            ccs.getRunMapArchive().put(jobId, run);
-            ccs.getRunHistory().put(jobId, run.getExceptions());
+            jobManager.completeJob(run);
         }
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index 2c5c965..4cac253 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -24,6 +24,7 @@
 
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
 import org.apache.hyracks.control.cc.partitions.PartitionUtils;
@@ -43,7 +44,8 @@
     @Override
     public void run() {
         final PartitionId pid = partitionDescriptor.getPartitionId();
-        JobRun run = ccs.getActiveRunMap().get(pid.getJobId());
+        JobManager jobManager = ccs.getJobManager();
+        JobRun run = jobManager.getJob(pid.getJobId());
         if (run == null) {
             return;
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
index 44fc40d..0a5d31d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
@@ -22,6 +22,7 @@
 
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
 import org.apache.hyracks.control.cc.partitions.PartitionUtils;
@@ -41,7 +42,8 @@
     @Override
     public void run() {
         PartitionId pid = partitionRequest.getPartitionId();
-        JobRun run = ccs.getActiveRunMap().get(pid.getJobId());
+        JobManager jobManager = ccs.getJobManager();
+        JobRun run = jobManager.getJob(pid.getJobId());
         if (run == null) {
             return;
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index 510c729..5850f44 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.AbstractWork;
 
@@ -63,10 +64,11 @@
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Number of affected jobs: " + size);
             }
+            JobManager jobManager = ccs.getJobManager();
             for (JobId jobId : affectedJobIds) {
-                JobRun run = ccs.getActiveRunMap().get(jobId);
+                JobRun run = jobManager.getJob(jobId);
                 if (run != null) {
-                    run.getScheduler().notifyNodeFailures(deadNodes);
+                    run.getExecutor().notifyNodeFailures(deadNodes);
                 }
             }
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
index 2278389..de19b75 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
@@ -24,6 +24,7 @@
 
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.work.AbstractWork;
@@ -39,9 +40,9 @@
 
     @Override
     public void run() {
-        Map<JobId, JobRun> runMap = ccs.getActiveRunMap();
+        JobManager jobManager = ccs.getJobManager();
         for (JobProfile profile : profiles) {
-            JobRun run = runMap.get(profile.getJobId());
+            JobRun run = jobManager.getJob(profile.getJobId());
             if (run != null) {
                 JobProfile jp = run.getJobProfile();
                 jp.merge(profile);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
index 2379871..45929e0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.TaskAttempt;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
@@ -42,7 +43,8 @@
     @Override
     protected void performEvent(TaskAttempt ta) {
         try {
-            JobRun run = ccs.getActiveRunMap().get(jobId);
+            JobManager jobManager = ccs.getJobManager();
+            JobRun run = jobManager.getJob(jobId);
             if (statistics != null) {
                 JobProfile jobProfile = run.getJobProfile();
                 Map<String, JobletProfile> jobletProfiles = 
jobProfile.getJobletProfiles();
@@ -53,7 +55,7 @@
                 }
                 jobletProfile.getTaskProfiles().put(taId, statistics);
             }
-            run.getScheduler().notifyTaskComplete(ta);
+            run.getExecutor().notifyTaskComplete(ta);
         } catch (HyracksException e) {
             e.printStackTrace();
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 8bca4e7..1f0acc8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.TaskAttempt;
 
@@ -37,9 +38,10 @@
 
     @Override
     protected void performEvent(TaskAttempt ta) {
-        JobRun run = ccs.getActiveRunMap().get(jobId);
+        JobManager jobManager = ccs.getJobManager();
+        JobRun run = jobManager.getJob(jobId);
         ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
-        run.getScheduler().notifyTaskFailure(ta, exceptions);
+        run.getExecutor().notifyTaskFailure(ta, exceptions);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index c1fa945..de42561 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.IJobStatusConditionVariable;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
@@ -39,7 +40,8 @@
 
     @Override
     protected void doRun() throws Exception {
-        final IJobStatusConditionVariable cRunningVar = 
ccs.getActiveRunMap().get(jobId);
+        JobManager jobManager = ccs.getJobManager();
+        final IJobStatusConditionVariable cRunningVar = 
jobManager.getJob(jobId);
         if (cRunningVar != null) {
             ccs.getExecutor().execute(new Runnable() {
                 @Override
@@ -53,7 +55,7 @@
                 }
             });
         } else {
-            final IJobStatusConditionVariable cArchivedVar = 
ccs.getRunMapArchive().get(jobId);
+            final IJobStatusConditionVariable cArchivedVar = 
jobManager.getJob(jobId);
             if (cArchivedVar != null) {
                 ccs.getExecutor().execute(new Runnable() {
                     @Override
@@ -67,7 +69,7 @@
                     }
                 });
             } else {
-                final List<Exception> exceptions = 
ccs.getRunHistory().get(jobId);
+                final List<Exception> exceptions = 
jobManager.getRunHistory(jobId);
                 ccs.getExecutor().execute(new Runnable() {
                     @Override
                     public void run() {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1424
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8fb6fda57efa139114dd234e08cc7de7129468c8
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to