Yingyi Bu has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1424
Change subject: WIP: concurrent query management support.
......................................................................
WIP: concurrent query management support.
Change-Id: I8fb6fda57efa139114dd234e08cc7de7129468c8
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
A
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IComputationResource.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
A
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobResourceController.java
A
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobResourceController.java
A
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/MemoryOnlyJobResourceController.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
R
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobExecutor.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
28 files changed, 518 insertions(+), 267 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/24/1424/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 00c2cc4..9a908ab 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -75,10 +75,8 @@
acg.setFrameSize(spec.getFrameSize());
acg.setMaxReattempts(spec.getMaxReattempts());
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
- acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
- acg.setReportTaskDetails(spec.isReportTaskDetails());
final Set<Constraint> constraints = new HashSet<Constraint>();
final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IComputationResource.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IComputationResource.java
new file mode 100644
index 0000000..deaf07e
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IComputationResource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job;
+
+public interface IComputationResource {
+
+ long DENY_ANYTHING_RESOURCE_COUNT = -1;
+
+ default long getAggregatedMemoryByteSize() {
+ return DENY_ANYTHING_RESOURCE_COUNT;
+ }
+
+ default long getAggregatedAvailableCores() {
+ return DENY_ANYTHING_RESOURCE_COUNT;
+ }
+
+ default long getMemoryByteSize(String nodeId) {
+ return DENY_ANYTHING_RESOURCE_COUNT;
+ }
+
+ default long getAvailableCores(String nodeId) {
+ return DENY_ANYTHING_RESOURCE_COUNT;
+ }
+
+ void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize);
+
+ void setAggregaedAvailableCores(int aggregatedAvailableCores);
+
+ void setMemoryByteSize(String nodeId, long memoryByteSize);
+
+ void setAvailableCores(String nodeId, int availableCores);
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 84a961e..40287b7 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -28,7 +28,6 @@
import java.util.Map;
import java.util.Set;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -44,6 +43,8 @@
import
org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.resource.DefaultJobResourceController;
+import org.apache.hyracks.api.job.resource.IJobResourceController;
public class JobSpecification implements Serializable,
IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
private static final long serialVersionUID = 1L;
@@ -76,11 +77,9 @@
private IJobletEventListenerFactory jobletEventListenerFactory;
- private IGlobalJobDataFactory globalJobDataFactory;
-
private boolean useConnectorPolicyForScheduling;
- private boolean reportTaskDetails;
+ private IJobResourceController jobResourceConstraint;
private transient int operatorIdCounter;
@@ -106,7 +105,7 @@
connectorIdCounter = 0;
maxReattempts = 2;
useConnectorPolicyForScheduling = false;
- reportTaskDetails = true;
+ jobResourceConstraint = DefaultJobResourceController.INSTANCE;
setFrameSize(frameSize);
}
@@ -281,14 +280,6 @@
this.jobletEventListenerFactory = jobletEventListenerFactory;
}
- public IGlobalJobDataFactory getGlobalJobDataFactory() {
- return globalJobDataFactory;
- }
-
- public void setGlobalJobDataFactory(IGlobalJobDataFactory
globalJobDataFactory) {
- this.globalJobDataFactory = globalJobDataFactory;
- }
-
public boolean isUseConnectorPolicyForScheduling() {
return useConnectorPolicyForScheduling;
}
@@ -297,12 +288,12 @@
this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
}
- public boolean isReportTaskDetails() {
- return reportTaskDetails;
+ public void setJobResourceController(IJobResourceController
jobResourceConstraint) {
+ this.jobResourceConstraint = jobResourceConstraint;
}
- public void setReportTaskDetails(boolean reportTaskDetails) {
- this.reportTaskDetails = reportTaskDetails;
+ public IJobResourceController getJobResourceController() {
+ return jobResourceConstraint;
}
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int
index, V value) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
index 4351e39..177675b 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobStatus.java
@@ -20,6 +20,7 @@
public enum JobStatus {
INITIALIZED,
+ WAITING,
RUNNING,
TERMINATED,
FAILURE,
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobResourceController.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobResourceController.java
new file mode 100644
index 0000000..ef51617
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobResourceController.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.job.IComputationResource;
+
+public class DefaultJobResourceController implements IJobResourceController {
+
+ public static DefaultJobResourceController INSTANCE = new
DefaultJobResourceController();
+
+ private DefaultJobResourceController() {
+ }
+
+ @Override
+ public boolean allocate(IComputationResource inputComputationResource) {
+ return true;
+ }
+
+ @Override
+ public void release(IComputationResource inputComputationResource) {
+
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobResourceController.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobResourceController.java
new file mode 100644
index 0000000..95c078d
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobResourceController.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.job.IComputationResource;
+
+public interface IJobResourceController extends Serializable {
+
+ boolean allocate(IComputationResource inputComputationResource);
+
+ void release(IComputationResource inputComputationResource);
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/MemoryOnlyJobResourceController.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/MemoryOnlyJobResourceController.java
new file mode 100644
index 0000000..521ce3e
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/MemoryOnlyJobResourceController.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.job.resource;
+
+import org.apache.hyracks.api.job.IComputationResource;
+
+public class MemoryOnlyJobResourceController implements IJobResourceController
{
+
+ private final long aggregatedRAMRequirement;
+
+ public MemoryOnlyJobResourceController(long aggregatedRAMRequirement) {
+ this.aggregatedRAMRequirement = aggregatedRAMRequirement;
+ }
+
+ @Override
+ public boolean allocate(IComputationResource inputComputationResource) {
+ return true;
+ }
+
+ @Override
+ public void release(IComputationResource inputComputationResource) {
+
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 5fdcede..e41d8dc 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -50,6 +50,7 @@
import org.apache.hyracks.control.cc.application.CCApplicationContext;
import org.apache.hyracks.control.cc.dataset.DatasetDirectoryService;
import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.web.WebServer;
import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
@@ -95,12 +96,6 @@
private CCApplicationContext appCtx;
- private final Map<JobId, JobRun> activeRunMap;
-
- private final Map<JobId, JobRun> runMapArchive;
-
- private final Map<JobId, List<Exception>> runMapHistory;
-
private final WorkQueue workQueue;
private ExecutorService executor;
@@ -118,6 +113,8 @@
private final Map<String, StateDumpRun> stateDumpRunMap;
private final Map<String, ThreadDumpRun> threadDumpRunMap;
+
+ private final JobManager jobManager;
private ShutdownRun shutdownCallback;
@@ -137,25 +134,7 @@
clientIPC = new IPCSystem(new
InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this);
- activeRunMap = new HashMap<>();
- runMapArchive = new LinkedHashMap<JobId, JobRun>() {
- private static final long serialVersionUID = 1L;
- @Override
- protected boolean removeEldestEntry(Map.Entry<JobId, JobRun>
eldest) {
- return size() > ccConfig.jobHistorySize;
- }
- };
- runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
- private static final long serialVersionUID = 1L;
- /** history size + 1 is for the case when history size = 0 */
- private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<JobId,
List<Exception>> eldest) {
- return size() > allowedSize;
- }
- };
// WorkQueue is in charge of heartbeat as well as other events.
workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY);
this.timer = new Timer(true);
@@ -167,6 +146,9 @@
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
threadDumpRunMap = Collections.synchronizedMap(new HashMap<>());
+
+ // Job manager is in charge of job lifecycle management.
+ jobManager = new JobManager(ccConfig, this);
}
private static ClusterTopology computeClusterTopology(CCConfig ccConfig)
throws Exception {
@@ -301,16 +283,8 @@
return ccContext;
}
- public Map<JobId, JobRun> getActiveRunMap() {
- return activeRunMap;
- }
-
- public Map<JobId, JobRun> getRunMapArchive() {
- return runMapArchive;
- }
-
- public Map<JobId, List<Exception>> getRunHistory() {
- return runMapHistory;
+ public JobManager getJobManager() {
+ return jobManager;
}
public Map<InetAddress, Set<String>> getIpAddressNodeNameMap() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
new file mode 100644
index 0000000..f788d7b
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.job;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.IComputationResource;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobResourceController;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.work.JobCleanupWork;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.work.IResultCallback;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class JobManager {
+
+ private static final Logger LOGGER =
Logger.getLogger(JobManager.class.getName());
+
+ private final ClusterControllerService ccs;
+ private final Queue<JobRun> jobQueue;
+ private final Map<JobId, JobRun> activeRunMap;
+ private final Map<JobId, JobRun> runMapArchive;
+ private final Map<JobId, List<Exception>> runMapHistory;
+ private final IComputationResource computationResource = null;
+
+ public JobManager(CCConfig ccConfig, ClusterControllerService ccs) {
+ this.ccs = ccs;
+ jobQueue = new ArrayDeque<>();
+ activeRunMap = new HashMap<>();
+ runMapArchive = new LinkedHashMap<JobId, JobRun>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<JobId, JobRun>
eldest) {
+ return size() > ccConfig.jobHistorySize;
+ }
+ };
+ runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+ private static final long serialVersionUID = 1L;
+ /** history size + 1 is for the case when history size = 0 */
+ private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<JobId,
List<Exception>> eldest) {
+ return size() > allowedSize;
+ }
+ };
+ }
+
+ public Collection<JobRun> getAllRunningJobs() {
+ return activeRunMap.values();
+ }
+
+ public Collection<JobRun> getArchivedJobs() {
+ return runMapArchive.values();
+ }
+
+ public JobRun getJob(JobId jobId) {
+ JobRun jobRun = activeRunMap.get(jobId);
+ if (jobRun == null) {
+ jobRun = runMapArchive.get(jobId);
+ }
+ return jobRun;
+ }
+
+ public void addJob(JobRun jobRun) {
+ IJobResourceController resourceController =
jobRun.getActivityClusterGraphFactory().getJobSpecification()
+ .getJobResourceController();
+ if (resourceController.allocate(computationResource)) {
+ executeJob(jobRun);
+ } else {
+ jobRun.setStatus(JobStatus.WAITING, null);
+ jobQueue.add(jobRun);
+ }
+ }
+
+ public void completeJob(JobRun run) {
+ JobId jobId = run.getJobId();
+ CCApplicationContext appCtx = ccs.getApplicationContext();
+ if (appCtx != null) {
+ try {
+ appCtx.notifyJobFinish(jobId);
+ } catch (HyracksException e) {
+ e.printStackTrace();
+ }
+ }
+ run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+ run.setEndTime(System.currentTimeMillis());
+ activeRunMap.remove(jobId);
+ runMapArchive.put(jobId, run);
+ runMapHistory.put(jobId, run.getExceptions());
+
+ if (run.getActivityClusterGraph().isReportTaskDetails()) {
+ /**
+ * log job details when profiling is enabled
+ */
+ try {
+ ccs.getJobLogFile().log(createJobLogObject(run));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Releases computation resources occupied by the job.
+ IJobResourceController resourceController =
run.getActivityClusterGraphFactory().getJobSpecification()
+ .getJobResourceController();
+ resourceController.release(computationResource);
+
+ // Picks the next job to execute.
+ pickNextJobToRun();
+ }
+
+ public void terminateJob(JobId jobId, JobStatus status, List<Exception>
exceptions) {
+ JobRun run = getJob(jobId);
+ if (run == null) {
+ LOGGER.warning("Unable to find JobRun with id: " + jobId);
+ return;
+ }
+ if (run.getPendingStatus() != null &&
run.getCleanupPendingNodeIds().isEmpty()) {
+ completeJob(run);
+ return;
+ }
+ if (run.getPendingStatus() != null) {
+ LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " +
jobId);
+ return;
+ }
+ Set<String> targetNodes = run.getParticipatingNodeIds();
+ run.getCleanupPendingNodeIds().addAll(targetNodes);
+ if (run.getPendingStatus() != JobStatus.FAILURE &&
run.getPendingStatus() != JobStatus.TERMINATED) {
+ run.setPendingStatus(status, exceptions);
+ }
+ if (targetNodes != null && !targetNodes.isEmpty()) {
+ Set<String> toDelete = new HashSet<>();
+ for (String n : targetNodes) {
+ NodeControllerState ncs = ccs.getNodeMap().get(n);
+ try {
+ if (ncs == null) {
+ toDelete.add(n);
+ } else {
+ ncs.getNodeController().cleanUpJoblet(jobId, status);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ targetNodes.removeAll(toDelete);
+ run.getCleanupPendingNodeIds().removeAll(toDelete);
+ if (run.getCleanupPendingNodeIds().isEmpty()) {
+ completeJob(run);
+ }
+ } else {
+ completeJob(run);
+ }
+ }
+
+ public List<Exception> getRunHistory(JobId jobId) {
+ List<Exception> exceptions = runMapHistory.get(jobId);
+ return exceptions;
+ }
+
+ private void pickNextJobToRun() {
+ for (JobRun run : jobQueue) {
+ IJobResourceController resourceController =
run.getActivityClusterGraphFactory().getJobSpecification()
+ .getJobResourceController();
+ if (resourceController.allocate(computationResource)) {
+ executeJob(run);
+ }
+ }
+ }
+
+ // Executes a job when the resource for the job is available.
+ private void executeJob(JobRun run) {
+ IResultCallback<JobId> callback = run.getCallback();
+ try {
+ //run.setStatus(JobStatus.INITIALIZED, null);
+ run.setStartTime(System.currentTimeMillis());
+ JobId jobId = run.getJobId();
+ activeRunMap.put(jobId, run);
+
+ CCApplicationContext appCtx = ccs.getApplicationContext();
+ IActivityClusterGraphGeneratorFactory acggf =
run.getActivityClusterGraphFactory();
+ appCtx.notifyJobCreation(jobId, acggf);
+ run.setStatus(JobStatus.RUNNING, null);
+ try {
+ run.getExecutor().startJob();
+ } catch (Exception e) {
+ ccs.getWorkQueue().schedule(
+ new JobCleanupWork(ccs, run.getJobId(),
JobStatus.FAILURE, Collections.singletonList(e)));
+ }
+ callback.setValue(jobId);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+
+ private ObjectNode createJobLogObject(final JobRun run) {
+ ObjectMapper om = new ObjectMapper();
+ ObjectNode jobLogObject = om.createObjectNode();
+ ActivityClusterGraph acg = run.getActivityClusterGraph();
+ jobLogObject.set("activity-cluster-graph", acg.toJSON());
+ jobLogObject.set("job-run", run.toJSON());
+ return jobLogObject;
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index f1d04bb..7e422e4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,7 +20,6 @@
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -42,6 +41,7 @@
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.ActivityClusterId;
import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
@@ -49,20 +49,23 @@
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
import org.apache.hyracks.control.cc.scheduler.ActivityPartitionDetails;
-import org.apache.hyracks.control.cc.scheduler.JobScheduler;
+import org.apache.hyracks.control.cc.scheduler.JobExecutor;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.utils.ExceptionUtils;
+import org.apache.hyracks.control.common.work.IResultCallback;
public class JobRun implements IJobStatusConditionVariable {
private final DeploymentId deploymentId;
private final JobId jobId;
+ private final IActivityClusterGraphGeneratorFactory acggf;
+
private final IActivityClusterGraphGenerator acgg;
private final ActivityClusterGraph acg;
- private final JobScheduler scheduler;
+ private final JobExecutor scheduler;
private final EnumSet<JobFlag> jobFlags;
@@ -94,21 +97,26 @@
private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
+ private final IResultCallback<JobId> callback;
+
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId,
JobId jobId,
- IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+ IActivityClusterGraphGeneratorFactory acggf,
IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags,
+ IResultCallback<JobId> callback) {
this.deploymentId = deploymentId;
this.jobId = jobId;
+ this.acggf = acggf;
this.acgg = acgg;
this.acg = acgg.initialize();
- this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
+ this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints());
this.jobFlags = jobFlags;
- activityClusterPlanMap = new HashMap<ActivityClusterId,
ActivityClusterPlan>();
+ this.callback = callback;
+ activityClusterPlanMap = new HashMap<>();
pmm = new PartitionMatchMaker();
- participatingNodeIds = new HashSet<String>();
- cleanupPendingNodeIds = new HashSet<String>();
+ participatingNodeIds = new HashSet<>();
+ cleanupPendingNodeIds = new HashSet<>();
profile = new JobProfile(jobId);
- connectorPolicyMap = new HashMap<ConnectorDescriptorId,
IConnectorPolicy>();
- operatorLocations = new HashMap<OperatorDescriptorId, Map<Integer,
String>>();
+ connectorPolicyMap = new HashMap<>();
+ operatorLocations = new HashMap<>();
createTime = System.currentTimeMillis();
}
@@ -118,6 +126,10 @@
public JobId getJobId() {
return jobId;
+ }
+
+ public IActivityClusterGraphGeneratorFactory
getActivityClusterGraphFactory() {
+ return acggf;
}
public ActivityClusterGraph getActivityClusterGraph() {
@@ -167,8 +179,8 @@
return createTime;
}
- public void setCreateTime(long createTime) {
- this.createTime = createTime;
+ public IResultCallback<JobId> getCallback() {
+ return callback;
}
public long getStartTime() {
@@ -228,7 +240,7 @@
return profile;
}
- public JobScheduler getScheduler() {
+ public JobExecutor getExecutor() {
return scheduler;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index c13a458..1faa9c8 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -55,11 +55,11 @@
public class ActivityClusterPlanner {
private static final Logger LOGGER =
Logger.getLogger(ActivityClusterPlanner.class.getName());
- private final JobScheduler scheduler;
+ private final JobExecutor scheduler;
private final Map<PartitionId, TaskCluster>
partitionProducingTaskClusterMap;
- public ActivityClusterPlanner(JobScheduler newJobScheduler) {
+ public ActivityClusterPlanner(JobExecutor newJobScheduler) {
this.scheduler = newJobScheduler;
partitionProducingTaskClusterMap = new HashMap<PartitionId,
TaskCluster>();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobExecutor.java
similarity index 94%
rename from
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
rename to
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobExecutor.java
index b577ff7..7ac4261 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobExecutor.java
@@ -31,8 +31,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.constraints.Constraint;
import
org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
@@ -54,7 +53,6 @@
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.job.Task;
@@ -66,8 +64,8 @@
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
-public class JobScheduler {
- private static final Logger LOGGER =
Logger.getLogger(JobScheduler.class.getName());
+public class JobExecutor {
+ private static final Logger LOGGER =
Logger.getLogger(JobExecutor.class.getName());
private final ClusterControllerService ccs;
@@ -80,7 +78,7 @@
private final Set<TaskCluster> inProgressTaskClusters;
- public JobScheduler(ClusterControllerService ccs, JobRun jobRun,
Collection<Constraint> constraints) {
+ public JobExecutor(ClusterControllerService ccs, JobRun jobRun,
Collection<Constraint> constraints) {
this.ccs = ccs;
this.jobRun = jobRun;
solver = new PartitionConstraintSolver();
@@ -666,7 +664,8 @@
jobRun.getParticipatingNodeIds().removeAll(deadNodes);
jobRun.getCleanupPendingNodeIds().removeAll(deadNodes);
if (jobRun.getPendingStatus() != null &&
jobRun.getCleanupPendingNodeIds().isEmpty()) {
- finishJob(jobRun);
+ JobManager jobManager = ccs.getJobManager();
+ jobManager.completeJob(jobRun);
return;
}
for (ActivityCluster ac :
jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
@@ -706,40 +705,4 @@
}
}
- private void finishJob(final JobRun run) {
- JobId jobId = run.getJobId();
- CCApplicationContext appCtx = ccs.getApplicationContext();
- if (appCtx != null) {
- try {
- appCtx.notifyJobFinish(jobId);
- } catch (HyracksException e) {
- e.printStackTrace();
- }
- }
- run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
- run.setEndTime(System.currentTimeMillis());
- ccs.getActiveRunMap().remove(jobId);
- ccs.getRunMapArchive().put(jobId, run);
- ccs.getRunHistory().put(jobId, run.getExceptions());
-
- if (run.getActivityClusterGraph().isReportTaskDetails()) {
- /**
- * log job details when task-profiling is enabled
- */
- try {
- ccs.getJobLogFile().log(createJobLogObject(run));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private ObjectNode createJobLogObject(final JobRun run) {
- ObjectMapper om = new ObjectMapper();
- ObjectNode jobLogObject = om.createObjectNode();
- ActivityClusterGraph acg = run.getActivityClusterGraph();
- jobLogObject.set("activity-cluster-graph", acg.toJSON());
- jobLogObject.set("job-run", run.toJSON());
- return jobLogObject;
- }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index 9134a91..b78c3e2 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.ActivityPlan;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.job.Task;
import org.apache.hyracks.control.cc.job.TaskAttempt;
@@ -50,7 +51,8 @@
@Override
public final void runWork() {
- JobRun run = ccs.getActiveRunMap().get(jobId);
+ JobManager jobManager = ccs.getJobManager();
+ JobRun run = jobManager.getJob(jobId);
if (run != null) {
TaskId tid = taId.getTaskId();
Map<ActivityId, ActivityCluster> activityClusterMap =
run.getActivityClusterGraph().getActivityMap();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
index 294ae97..64882e0 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
@@ -18,13 +18,14 @@
*/
package org.apache.hyracks.control.cc.work;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.JobRun;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
public class GetActivityClusterGraphJSONWork extends SynchronizableWork {
private final ClusterControllerService ccs;
@@ -38,15 +39,12 @@
@Override
protected void doRun() throws Exception {
-
+ JobManager jobManager = ccs.getJobManager();
ObjectMapper om = new ObjectMapper();
- JobRun run = ccs.getActiveRunMap().get(jobId);
+ JobRun run = jobManager.getJob(jobId);
if (run == null) {
- run = ccs.getRunMapArchive().get(jobId);
- if (run == null) {
- json = om.createObjectNode();
- return;
- }
+ json = om.createObjectNode();
+ return;
}
json = run.getActivityClusterGraph().toJSON();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
index e072c21..f54a5b2 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobInfo;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -39,10 +40,8 @@
@Override
protected void doRun() throws Exception {
try {
- JobRun run = ccs.getActiveRunMap().get(jobId);
- if (run == null) {
- run = ccs.getRunMapArchive().get(jobId);
- }
+ JobManager jobManager = ccs.getJobManager();
+ JobRun run = jobManager.getJob(jobId);
JobInfo info = (run != null) ? new JobInfo(run.getJobId(),
run.getStatus(), run.getOperatorLocations())
: null;
callback.setValue(info);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
index aad6edf..40fee46 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java
@@ -18,13 +18,14 @@
*/
package org.apache.hyracks.control.cc.work;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
public class GetJobRunJSONWork extends SynchronizableWork {
private final ClusterControllerService ccs;
@@ -38,14 +39,12 @@
@Override
protected void doRun() throws Exception {
+ JobManager jobManager = ccs.getJobManager();
ObjectMapper om = new ObjectMapper();
- JobRun run = ccs.getActiveRunMap().get(jobId);
+ JobRun run = jobManager.getJob(jobId);
if (run == null) {
- run = ccs.getRunMapArchive().get(jobId);
- if (run == null) {
- json = om.createObjectNode();
- return;
- }
+ json = om.createObjectNode();
+ return;
}
json = run.toJSON();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
index d45a9cc..72ff27e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -39,10 +40,8 @@
@Override
protected void doRun() throws Exception {
try {
- JobRun run = ccs.getActiveRunMap().get(jobId);
- if (run == null) {
- run = ccs.getRunMapArchive().get(jobId);
- }
+ JobManager jobManager = ccs.getJobManager();
+ JobRun run = jobManager.getJob(jobId);
JobStatus status = run == null ? null : run.getStatus();
callback.setValue(status);
} catch (Exception e) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
index 1e5a3a5..681ee39 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -39,8 +40,9 @@
protected void doRun() throws Exception {
ObjectMapper om = new ObjectMapper();
summaries = om.createArrayNode();
- populateJSON(ccs.getActiveRunMap().values());
- populateJSON(ccs.getRunMapArchive().values());
+ JobManager jobManager = ccs.getJobManager();
+ populateJSON(jobManager.getAllRunningJobs());
+ populateJSON(jobManager.getArchivedJobs());
}
private void populateJSON(Collection<JobRun> jobRuns) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 2a383b6..b8f5454 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -18,21 +18,13 @@
*/
package org.apache.hyracks.control.cc.work;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.logging.Logger;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
-import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.common.work.AbstractWork;
public class JobCleanupWork extends AbstractWork {
@@ -53,82 +45,8 @@
@Override
public void run() {
LOGGER.info("Cleanup for JobRun with id: " + jobId);
- final JobRun run = ccs.getActiveRunMap().get(jobId);
- if (run == null) {
- LOGGER.warning("Unable to find JobRun with id: " + jobId);
- return;
- }
- if (run.getPendingStatus() != null &&
run.getCleanupPendingNodeIds().isEmpty()) {
- finishJob(run);
- return;
- }
- if (run.getPendingStatus() != null) {
- LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " +
jobId);
- return;
- }
- Set<String> targetNodes = run.getParticipatingNodeIds();
- run.getCleanupPendingNodeIds().addAll(targetNodes);
- if (run.getPendingStatus() != JobStatus.FAILURE &&
run.getPendingStatus() != JobStatus.TERMINATED) {
- run.setPendingStatus(status, exceptions);
- }
- if (targetNodes != null && !targetNodes.isEmpty()) {
- Set<String> toDelete = new HashSet<String>();
- for (String n : targetNodes) {
- NodeControllerState ncs = ccs.getNodeMap().get(n);
- try {
- if (ncs == null) {
- toDelete.add(n);
- } else {
- ncs.getNodeController().cleanUpJoblet(jobId, status);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- targetNodes.removeAll(toDelete);
- run.getCleanupPendingNodeIds().removeAll(toDelete);
- if (run.getCleanupPendingNodeIds().isEmpty()) {
- finishJob(run);
- }
- } else {
- finishJob(run);
- }
- }
-
- private void finishJob(final JobRun run) {
- CCApplicationContext appCtx = ccs.getApplicationContext();
- if (appCtx != null) {
- try {
- appCtx.notifyJobFinish(jobId);
- } catch (HyracksException e) {
- e.printStackTrace();
- }
- }
- run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
- run.setEndTime(System.currentTimeMillis());
- ccs.getActiveRunMap().remove(jobId);
- ccs.getRunMapArchive().put(jobId, run);
- ccs.getRunHistory().put(jobId, run.getExceptions());
-
- if (run.getActivityClusterGraph().isReportTaskDetails()) {
- /**
- * log job details when profiling is enabled
- */
- try {
- ccs.getJobLogFile().log(createJobLogObject(run));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private ObjectNode createJobLogObject(final JobRun run) {
- ObjectMapper om = new ObjectMapper();
- ObjectNode jobLogObject = om.createObjectNode();
- ActivityClusterGraph acg = run.getActivityClusterGraph();
- jobLogObject.set("activity-cluster-graph", acg.toJSON());
- jobLogObject.set("job-run", run.toJSON());
- return jobLogObject;
+ JobManager jobManager = ccs.getJobManager();
+ jobManager.terminateJob(jobId, status, exceptions);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index e7844e9..df9315f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -59,19 +60,9 @@
IActivityClusterGraphGeneratorFactory acggf =
(IActivityClusterGraphGeneratorFactory) DeploymentUtils
.deserialize(acggfBytes, deploymentId, appCtx);
IActivityClusterGraphGenerator acgg =
acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
- JobRun run = new JobRun(ccs, deploymentId, jobId, acgg, jobFlags);
- run.setStatus(JobStatus.INITIALIZED, null);
- run.setStartTime(System.currentTimeMillis());
- ccs.getActiveRunMap().put(jobId, run);
- appCtx.notifyJobCreation(jobId, acggf);
- run.setStatus(JobStatus.RUNNING, null);
- try {
- run.getScheduler().startJob();
- } catch (Exception e) {
- ccs.getWorkQueue().schedule(
- new JobCleanupWork(ccs, run.getJobId(),
JobStatus.FAILURE, Collections.singletonList(e)));
- }
- callback.setValue(jobId);
+ JobRun run = new JobRun(ccs, deploymentId, jobId, acggf, acgg,
jobFlags, callback);
+ JobManager jobManager = ccs.getJobManager();
+ jobManager.addJob(run);
} catch (Exception e) {
callback.setException(e);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 603b6f8..52e4960 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -22,11 +22,10 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
@@ -45,7 +44,8 @@
@Override
public void runWork() {
- final JobRun run = ccs.getActiveRunMap().get(jobId);
+ JobManager jobManager = ccs.getJobManager();
+ final JobRun run = jobManager.getJob(jobId);
Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
if (!cleanupPendingNodes.remove(nodeId)) {
if (LOGGER.isLoggable(Level.WARNING)) {
@@ -59,19 +59,7 @@
ncs.getActiveJobIds().remove(jobId);
}
if (cleanupPendingNodes.isEmpty()) {
- CCApplicationContext appCtx = ccs.getApplicationContext();
- if (appCtx != null) {
- try {
- appCtx.notifyJobFinish(jobId);
- } catch (HyracksException e) {
- e.printStackTrace();
- }
- }
- run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
- run.setEndTime(System.currentTimeMillis());
- ccs.getActiveRunMap().remove(jobId);
- ccs.getRunMapArchive().put(jobId, run);
- ccs.getRunHistory().put(jobId, run.getExceptions());
+ jobManager.completeJob(run);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index 2c5c965..4cac253 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
import org.apache.hyracks.control.cc.partitions.PartitionUtils;
@@ -43,7 +44,8 @@
@Override
public void run() {
final PartitionId pid = partitionDescriptor.getPartitionId();
- JobRun run = ccs.getActiveRunMap().get(pid.getJobId());
+ JobManager jobManager = ccs.getJobManager();
+ JobRun run = jobManager.getJob(pid.getJobId());
if (run == null) {
return;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
index 44fc40d..0a5d31d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterPartitionRequestWork.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
import org.apache.hyracks.control.cc.partitions.PartitionUtils;
@@ -41,7 +42,8 @@
@Override
public void run() {
PartitionId pid = partitionRequest.getPartitionId();
- JobRun run = ccs.getActiveRunMap().get(pid.getJobId());
+ JobManager jobManager = ccs.getJobManager();
+ JobRun run = jobManager.getJob(pid.getJobId());
if (run == null) {
return;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index 510c729..5850f44 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.AbstractWork;
@@ -63,10 +64,11 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Number of affected jobs: " + size);
}
+ JobManager jobManager = ccs.getJobManager();
for (JobId jobId : affectedJobIds) {
- JobRun run = ccs.getActiveRunMap().get(jobId);
+ JobRun run = jobManager.getJob(jobId);
if (run != null) {
- run.getScheduler().notifyNodeFailures(deadNodes);
+ run.getExecutor().notifyNodeFailures(deadNodes);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
index 2278389..de19b75 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportProfilesWork.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.work.AbstractWork;
@@ -39,9 +40,9 @@
@Override
public void run() {
- Map<JobId, JobRun> runMap = ccs.getActiveRunMap();
+ JobManager jobManager = ccs.getJobManager();
for (JobProfile profile : profiles) {
- JobRun run = runMap.get(profile.getJobId());
+ JobRun run = jobManager.getJob(profile.getJobId());
if (run != null) {
JobProfile jp = run.getJobProfile();
jp.merge(profile);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
index 2379871..45929e0 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.job.TaskAttempt;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
@@ -42,7 +43,8 @@
@Override
protected void performEvent(TaskAttempt ta) {
try {
- JobRun run = ccs.getActiveRunMap().get(jobId);
+ JobManager jobManager = ccs.getJobManager();
+ JobRun run = jobManager.getJob(jobId);
if (statistics != null) {
JobProfile jobProfile = run.getJobProfile();
Map<String, JobletProfile> jobletProfiles =
jobProfile.getJobletProfiles();
@@ -53,7 +55,7 @@
}
jobletProfile.getTaskProfiles().put(taId, statistics);
}
- run.getScheduler().notifyTaskComplete(ta);
+ run.getExecutor().notifyTaskComplete(ta);
} catch (HyracksException e) {
e.printStackTrace();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 8bca4e7..1f0acc8 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.job.TaskAttempt;
@@ -37,9 +38,10 @@
@Override
protected void performEvent(TaskAttempt ta) {
- JobRun run = ccs.getActiveRunMap().get(jobId);
+ JobManager jobManager = ccs.getJobManager();
+ JobRun run = jobManager.getJob(jobId);
ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
- run.getScheduler().notifyTaskFailure(ta, exceptions);
+ run.getExecutor().notifyTaskFailure(ta, exceptions);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index c1fa945..de42561 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.IJobStatusConditionVariable;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -39,7 +40,8 @@
@Override
protected void doRun() throws Exception {
- final IJobStatusConditionVariable cRunningVar =
ccs.getActiveRunMap().get(jobId);
+ JobManager jobManager = ccs.getJobManager();
+ final IJobStatusConditionVariable cRunningVar =
jobManager.getJob(jobId);
if (cRunningVar != null) {
ccs.getExecutor().execute(new Runnable() {
@Override
@@ -53,7 +55,7 @@
}
});
} else {
- final IJobStatusConditionVariable cArchivedVar =
ccs.getRunMapArchive().get(jobId);
+ final IJobStatusConditionVariable cArchivedVar =
jobManager.getJob(jobId);
if (cArchivedVar != null) {
ccs.getExecutor().execute(new Runnable() {
@Override
@@ -67,7 +69,7 @@
}
});
} else {
- final List<Exception> exceptions =
ccs.getRunHistory().get(jobId);
+ final List<Exception> exceptions =
jobManager.getRunHistory(jobId);
ccs.getExecutor().execute(new Runnable() {
@Override
public void run() {
--
To view, visit https://asterix-gerrit.ics.uci.edu/1424
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8fb6fda57efa139114dd234e08cc7de7129468c8
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>