Steven Jacobs has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1377
Change subject: ASTERIXDB-1747 Implemented full lifecycle capabilities for
distributed jobs
......................................................................
ASTERIXDB-1747 Implemented full lifecycle capabilities for distributed jobs
Added distribute and destroy functionality
Removed serialization and bytes when running pred-distributed jobs
Cleaned up methods
Change-Id: I59c3422d5c1ab7756a6a4685ac527dfe50434954
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
20 files changed, 534 insertions(+), 46 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/77/1377/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 780e205..aa292f6 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -36,6 +36,8 @@
GET_JOB_STATUS,
GET_JOB_INFO,
START_JOB,
+ DISTRIBUTE_JOB,
+ DESTROY_JOB,
GET_DATASET_DIRECTORY_SERIVICE_INFO,
GET_DATASET_RESULT_STATUS,
GET_DATASET_RESULT_LOCATIONS,
@@ -101,6 +103,44 @@
}
}
+ public static class DistributeJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final byte[] acggfBytes;
+
+ public DistributeJobFunction(byte[] acggfBytes) {
+ this.acggfBytes = acggfBytes;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DISTRIBUTE_JOB;
+ }
+
+ public byte[] getACGGFBytes() {
+ return acggfBytes;
+ }
+ }
+
+ public static class DestroyJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public DestroyJobFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_JOB;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
public static class StartJobFunction extends Function {
private static final long serialVersionUID = 1L;
@@ -116,8 +156,8 @@
this.jobId = jobId;
}
- public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags,
JobId jobId) {
- this(null, acggfBytes, jobFlags, jobId);
+ public StartJobFunction(JobId jobId) {
+ this(null, null, null, jobId);
}
public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index c049007..8e7affb 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -69,9 +69,9 @@
}
@Override
- public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId
jobId) throws Exception {
+ public JobId startJob(JobId jobId) throws Exception {
HyracksClientInterfaceFunctions.StartJobFunction sjf =
- new
HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags, jobId);
+ new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
return (JobId) rpci.call(ipcHandle, sjf);
}
@@ -83,6 +83,20 @@
}
@Override
+ public JobId distributeJob(byte[] acggfBytes) throws Exception {
+ HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
+ new
HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
+ return (JobId) rpci.call(ipcHandle, sjf);
+ }
+
+ @Override
+ public JobId destroyJob(JobId jobId) throws Exception {
+ HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
+ new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
+ return (JobId) rpci.call(ipcHandle, sjf);
+ }
+
+ @Override
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction
gddsf =
new
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index eb92c37..10e45d8 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -103,19 +103,28 @@
}
@Override
- public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
JobId jobId) throws Exception {
+ public JobId distributeJob(JobSpecification jobSpec) throws Exception {
JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
new
JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
- return startJob(jsacggf, jobFlags, jobId);
+ return distributeJob(jsacggf);
+ }
+
+ @Override
+ public JobId destroyJob(JobId jobId) throws Exception {
+ return hci.destroyJob(jobId);
+ }
+
+ @Override
+ public JobId startJob(JobId jobId) throws Exception {
+ return hci.startJob(jobId);
}
public JobId startJob(IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags) throws Exception {
return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
}
- public JobId startJob(IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags, JobId jobId)
- throws Exception {
- return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags,
jobId);
+ public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf)
throws Exception {
+ return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
}
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 031896e..dc51324 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -91,11 +91,27 @@
* Job Specification
* @param jobFlags
* Flags
- * @param jobId
- * Used to run a pre-distributed job by id (the same value will
be returned)
* @throws Exception
*/
- public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
JobId jobId) throws Exception;
+ public JobId distributeJob(JobSpecification jobSpec) throws Exception;
+
+ /**
+ * Destroy the distributed graph for a pre-distributed job
+ *
+ * @param jobId
+ * The id of the predistributed job
+ * @throws Exception
+ */
+ public JobId destroyJob(JobId jobId) throws Exception;
+
+ /**
+ * Used to run a pre-distributed job by id (the same JobId will be
returned)
+ *
+ * @param jobId
+ * The id of the predistributed job
+ * @throws Exception
+ */
+ public JobId startJob(JobId jobId) throws Exception;
/**
* Start the specified Job.
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 39063c6..f7995d7 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -38,7 +38,11 @@
public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws
Exception;
- public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId
jobId) throws Exception;
+ public JobId startJob(JobId jobId) throws Exception;
+
+ public JobId distributeJob(byte[] acggfBytes) throws Exception;
+
+ public JobId destroyJob(JobId jobId) throws Exception;
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index 1656c51..a33c6c9 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,6 +19,5 @@
package org.apache.hyracks.api.job;
public enum JobFlag {
- PROFILE_RUNTIME,
- STORE_JOB
+ PROFILE_RUNTIME
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 26beb63..67f75ff 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -31,6 +31,8 @@
import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
+import org.apache.hyracks.control.cc.work.DestroyJobWork;
+import org.apache.hyracks.control.cc.work.DistributeJobWork;
import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import org.apache.hyracks.control.cc.work.GetJobInfoWork;
import org.apache.hyracks.control.cc.work.GetJobStatusWork;
@@ -81,18 +83,35 @@
ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs,
gjif.getJobId(),
new IPCResponder<JobInfo>(handle, mid)));
break;
+ case DISTRIBUTE_JOB:
+ HyracksClientInterfaceFunctions.DistributeJobFunction djf =
+
(HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
+ ccs.getWorkQueue().schedule(new DistributeJobWork(ccs,
djf.getACGGFBytes(), jobIdFactory.create(),
+ new IPCResponder<JobId>(handle, mid)));
+ break;
+ case DESTROY_JOB:
+ HyracksClientInterfaceFunctions.DestroyJobFunction dsjf =
+ (HyracksClientInterfaceFunctions.DestroyJobFunction)
fn;
+ ccs.getWorkQueue()
+ .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new
IPCResponder<JobId>(handle, mid)));
+ break;
case START_JOB:
HyracksClientInterfaceFunctions.StartJobFunction sjf =
(HyracksClientInterfaceFunctions.StartJobFunction) fn;
JobId jobId = sjf.getJobId();
byte[] acggfBytes = null;
+ boolean predistributed = false;
if (jobId == null) {
+ //The job is new
jobId = jobIdFactory.create();
+ acggfBytes = sjf.getACGGFBytes();
}
- //TODO: only send these when the jobId is null
- acggfBytes = sjf.getACGGFBytes();
+ else {
+ //The job has been predistributed. We don't need to send
an ActivityClusterGraph
+ predistributed = true;
+ }
ccs.getWorkQueue().schedule(new JobStartWork(ccs,
sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
- jobId, new IPCResponder<JobId>(handle, mid)));
+ jobId, new IPCResponder<JobId>(handle, mid),
predistributed));
break;
case GET_DATASET_DIRECTORY_SERIVICE_INFO:
ccs.getWorkQueue().schedule(new
GetDatasetDirectoryServiceInfoWork(ccs,
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 5fdcede..9a4078a 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -40,9 +41,11 @@
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
@@ -101,6 +104,10 @@
private final Map<JobId, List<Exception>> runMapHistory;
+ private final Map<JobId, ActivityClusterGraph> activityClusterGraphMap;
+
+ private final Map<JobId, Set<Constraint>>
activityClusterGraphConstraintsMap;
+
private final WorkQueue workQueue;
private ExecutorService executor;
@@ -138,6 +145,8 @@
new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this);
activeRunMap = new HashMap<>();
+ activityClusterGraphMap = new Hashtable<>();
+ activityClusterGraphConstraintsMap = new Hashtable<>();
runMapArchive = new LinkedHashMap<JobId, JobRun>() {
private static final long serialVersionUID = 1L;
@@ -313,6 +322,14 @@
return runMapHistory;
}
+ public Map<JobId, ActivityClusterGraph> getActivityClusterGraphMap() {
+ return activityClusterGraphMap;
+ }
+
+ public Map<JobId, Set<Constraint>> getActivityClusterGraphConstraintsMap()
{
+ return activityClusterGraphConstraintsMap;
+ }
+
public Map<InetAddress, Set<String>> getIpAddressNodeNameMap() {
return ipAddressNodeNameMap;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index dbe4202..b788d10 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,7 +20,6 @@
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -28,10 +27,7 @@
import java.util.Map;
import java.util.Set;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
+import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -53,17 +49,20 @@
import org.apache.hyracks.control.cc.scheduler.JobScheduler;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.utils.ExceptionUtils;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
public class JobRun implements IJobStatusConditionVariable {
private final DeploymentId deploymentId;
private final JobId jobId;
- private final IActivityClusterGraphGenerator acgg;
+ private IActivityClusterGraphGenerator acgg;
- private final ActivityClusterGraph acg;
+ private ActivityClusterGraph acg;
- private final JobScheduler scheduler;
+ private JobScheduler scheduler;
private final EnumSet<JobFlag> jobFlags;
@@ -95,13 +94,9 @@
private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
- public JobRun(ClusterControllerService ccs, DeploymentId deploymentId,
JobId jobId,
- IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+ private JobRun(ClusterControllerService ccs, DeploymentId deploymentId,
JobId jobId, EnumSet<JobFlag> jobFlags) {
this.deploymentId = deploymentId;
this.jobId = jobId;
- this.acgg = acgg;
- this.acg = acgg.initialize();
- this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
this.jobFlags = jobFlags;
activityClusterPlanMap = new HashMap<ActivityClusterId,
ActivityClusterPlan>();
pmm = new PartitionMatchMaker();
@@ -111,6 +106,32 @@
connectorPolicyMap = new HashMap<ConnectorDescriptorId,
IConnectorPolicy>();
operatorLocations = new HashMap<OperatorDescriptorId, Map<Integer,
String>>();
createTime = System.currentTimeMillis();
+
+ }
+
+ //Run a Pre-distributed job by passing the ActivityClusterGraph
+ public JobRun(ClusterControllerService ccs, DeploymentId deploymentId,
JobId jobId)
+ throws HyracksException {
+ this(ccs, deploymentId, jobId, EnumSet.noneOf(JobFlag.class));
+ Map<JobId, ActivityClusterGraph> acgMap =
ccs.getActivityClusterGraphMap();
+ Map<JobId, Set<Constraint>> acgConstaintsMap =
ccs.getActivityClusterGraphConstraintsMap();
+ ActivityClusterGraph entry = acgMap.get(jobId);
+ Set<Constraint> constaints = acgConstaintsMap.get(jobId);
+ if (entry == null || constaints == null) {
+ throw new HyracksException("Trying to run a pre-destributed job
with no cluster map");
+ }
+ this.acg = entry;
+ this.acgg = null;
+ this.scheduler = new JobScheduler(ccs, this, constaints, true);
+ }
+
+ //Run a new job by creating an ActivityClusterGraph
+ public JobRun(ClusterControllerService ccs, DeploymentId deploymentId,
JobId jobId,
+ IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+ this(ccs, deploymentId, jobId, jobFlags);
+ this.acgg = acgg;
+ this.acg = acgg.initialize();
+ this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints(),
false);
}
public DeploymentId getDeploymentId() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
index ab026eb..db24d7d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
@@ -31,9 +31,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.json.JSONException;
-import org.json.JSONObject;
-
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.constraints.Constraint;
import
org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
@@ -66,6 +63,8 @@
import org.apache.hyracks.control.cc.work.JobCleanupWork;
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.json.JSONException;
+import org.json.JSONObject;
public class JobScheduler {
private static final Logger LOGGER =
Logger.getLogger(JobScheduler.class.getName());
@@ -76,13 +75,17 @@
private final PartitionConstraintSolver solver;
+ private final boolean predistributed;
+
private final Map<PartitionId, TaskCluster>
partitionProducingTaskClusterMap;
private final Set<TaskCluster> inProgressTaskClusters;
- public JobScheduler(ClusterControllerService ccs, JobRun jobRun,
Collection<Constraint> constraints) {
+ public JobScheduler(ClusterControllerService ccs, JobRun jobRun,
Collection<Constraint> constraints,
+ boolean predistributed) {
this.ccs = ccs;
this.jobRun = jobRun;
+ this.predistributed = predistributed;
solver = new PartitionConstraintSolver();
partitionProducingTaskClusterMap = new HashMap<PartitionId,
TaskCluster>();
inProgressTaskClusters = new HashSet<TaskCluster>();
@@ -465,7 +468,6 @@
final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
jobRun.getConnectorPolicyMap());
try {
- byte[] acgBytes = JavaSerializationUtils.serialize(acg);
for (Map.Entry<String, List<TaskAttemptDescriptor>> entry :
taskAttemptMap.entrySet()) {
String nodeId = entry.getKey();
final List<TaskAttemptDescriptor> taskDescriptors =
entry.getValue();
@@ -476,6 +478,10 @@
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Starting: " + taskDescriptors + " at " +
entry.getKey());
}
+ byte[] acgBytes = null;
+ if (!predistributed && changed) {
+ acgBytes = JavaSerializationUtils.serialize(acg);
+ }
byte[] jagBytes = changed ? acgBytes : null;
node.getNodeController().startTasks(deploymentId, jobId,
jagBytes, taskDescriptors,
connectorPolicies, jobRun.getFlags());
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
new file mode 100644
index 0000000..d565511
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DestroyJobWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+ private final JobId jobId;
+ private final IResultCallback<JobId> callback;
+
+ public DestroyJobWork(ClusterControllerService ccs, JobId jobId,
IResultCallback<JobId> callback) {
+ this.jobId = jobId;
+ this.ccs = ccs;
+ this.callback = callback;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ Map<JobId, ActivityClusterGraph> acgMap =
ccs.getActivityClusterGraphMap();
+ ActivityClusterGraph acg = acgMap.get(jobId);
+ if (acg == null) {
+ throw new RuntimeException("Trying to destroy a job that was
never distributed!");
+ }
+ acgMap.remove(jobId);
+ for (NodeControllerState node : ccs.getNodeMap().values()) {
+ node.getNodeController().destroyJob(jobId);
+ }
+ callback.setValue(jobId);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
new file mode 100644
index 0000000..8ec315c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DistributeJobWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+ private final byte[] acggfBytes;
+ private final JobId jobId;
+ private final IResultCallback<JobId> callback;
+
+ public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes,
JobId jobId,
+ IResultCallback<JobId> callback) {
+ this.jobId = jobId;
+ this.ccs = ccs;
+ this.acggfBytes = acggfBytes;
+ this.callback = callback;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ final CCApplicationContext appCtx = ccs.getApplicationContext();
+ Map<JobId, ActivityClusterGraph> acgMap =
ccs.getActivityClusterGraphMap();
+ Map<JobId, Set<Constraint>> acgConstaintsMap =
ccs.getActivityClusterGraphConstraintsMap();
+ ActivityClusterGraph entry = acgMap.get(jobId);
+ Set<Constraint> constaints = acgConstaintsMap.get(jobId);
+ if (entry != null || constaints != null) {
+ throw new HyracksException("Trying to distribute a job with a
duplicate jobId");
+ }
+ IActivityClusterGraphGeneratorFactory acggf =
+ (IActivityClusterGraphGeneratorFactory)
DeploymentUtils.deserialize(acggfBytes, null, appCtx);
+ IActivityClusterGraphGenerator acgg =
+ acggf.createActivityClusterGraphGenerator(jobId, appCtx,
EnumSet.noneOf(JobFlag.class));
+ ActivityClusterGraph acg = acgg.initialize();
+ acgMap.put(jobId, acg);
+ acgConstaintsMap.put(jobId, acgg.getConstraints());
+
+ appCtx.notifyJobCreation(jobId, acggf);
+
+ byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+ for (NodeControllerState node : ccs.getNodeMap().values()) {
+ node.getNodeController().distributeJob(jobId, acgBytes);
+ }
+
+ callback.setValue(jobId);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index e7844e9..6b7f26f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -41,29 +41,42 @@
private final DeploymentId deploymentId;
private final JobId jobId;
private final IResultCallback<JobId> callback;
+ private final boolean predestributed;
public JobStartWork(ClusterControllerService ccs, DeploymentId
deploymentId, byte[] acggfBytes,
- EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId>
callback) {
+ EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId>
callback, boolean predestributed) {
this.deploymentId = deploymentId;
this.jobId = jobId;
this.ccs = ccs;
this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
this.callback = callback;
+ this.predestributed = predestributed;
}
@Override
protected void doRun() throws Exception {
try {
final CCApplicationContext appCtx = ccs.getApplicationContext();
- IActivityClusterGraphGeneratorFactory acggf =
(IActivityClusterGraphGeneratorFactory) DeploymentUtils
- .deserialize(acggfBytes, deploymentId, appCtx);
- IActivityClusterGraphGenerator acgg =
acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
- JobRun run = new JobRun(ccs, deploymentId, jobId, acgg, jobFlags);
+ JobRun run;
+ IActivityClusterGraphGeneratorFactory acggf = null;
+ if (!predestributed) {
+ //Need to create the ActivityClusterGraph
+ acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
+ .deserialize(acggfBytes, deploymentId, appCtx);
+ IActivityClusterGraphGenerator acgg =
+ acggf.createActivityClusterGraphGenerator(jobId,
appCtx, jobFlags);
+ run = new JobRun(ccs, deploymentId, jobId, acgg, jobFlags);
+ } else {
+ //ActivityClusterGraph has already been distributed
+ run = new JobRun(ccs, deploymentId, jobId);
+ }
run.setStatus(JobStatus.INITIALIZED, null);
run.setStartTime(System.currentTimeMillis());
ccs.getActiveRunMap().put(jobId, run);
- appCtx.notifyJobCreation(jobId, acggf);
+ if (!predestributed) {
+ appCtx.notifyJobCreation(jobId, acggf);
+ }
run.setStatus(JobStatus.RUNNING, null);
try {
run.getScheduler().startJob();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index dff5827..aaea856 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -49,6 +49,10 @@
public void undeployBinary(DeploymentId deploymentId) throws Exception;
+ public void distributeJob(JobId jobId, byte[] planBytes) throws Exception;
+
+ public void destroyJob(JobId jobId) throws Exception;
+
public void dumpState(String stateDumpId) throws Exception;
public void shutdown(boolean terminateNCService) throws Exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index aa9a4fe..18e1b79 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -98,6 +98,9 @@
SHUTDOWN_REQUEST,
SHUTDOWN_RESPONSE,
+ DISTRIBUTE_JOB,
+ DESTROY_JOB,
+
STATE_DUMP_REQUEST,
STATE_DUMP_RESPONSE,
@@ -668,6 +671,51 @@
}
}
+ public static class DistributeJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final byte[] acgBytes;
+
+ public DistributeJobFunction(JobId jobId, byte[] acgBytes) {
+ this.jobId = jobId;
+ this.acgBytes = acgBytes;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DISTRIBUTE_JOB;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public byte[] getacgBytes() {
+ return acgBytes;
+ }
+ }
+
+ public static class DestroyJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public DestroyJobFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_JOB;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
public static class StartTasksFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index c3376e6..aa96bdb 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -84,6 +84,18 @@
}
@Override
+ public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
+ CCNCFunctions.DistributeJobFunction fn = new
CCNCFunctions.DistributeJobFunction(jobId, planBytes);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void destroyJob(JobId jobId) throws Exception {
+ CCNCFunctions.DestroyJobFunction fn = new
CCNCFunctions.DestroyJobFunction(jobId);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
public void dumpState(String stateDumpId) throws Exception {
CCNCFunctions.StateDumpRequestFunction dsf = new
CCNCFunctions.StateDumpRequestFunction(stateDumpId);
ipcHandle.send(-1, dsf, null);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 93ccaa4..9a58cc5 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -26,6 +26,8 @@
import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
import org.apache.hyracks.control.nc.work.CleanupJobletWork;
import org.apache.hyracks.control.nc.work.DeployBinaryWork;
+import org.apache.hyracks.control.nc.work.DestroyJobWork;
+import org.apache.hyracks.control.nc.work.DistributeJobWork;
import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
import org.apache.hyracks.control.nc.work.StartTasksWork;
import org.apache.hyracks.control.nc.work.StateDumpWork;
@@ -98,6 +100,16 @@
CCNCFunctions.UnDeployBinaryFunction ndbf =
(CCNCFunctions.UnDeployBinaryFunction) fn;
ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs,
ndbf.getDeploymentId()));
return;
+
+ case DISTRIBUTE_JOB:
+ CCNCFunctions.DistributeJobFunction djf =
(CCNCFunctions.DistributeJobFunction) fn;
+ ncs.getWorkQueue().schedule(new DistributeJobWork(ncs,
djf.getJobId(), djf.getacgBytes()));
+ return;
+
+ case DESTROY_JOB:
+ CCNCFunctions.DestroyJobFunction dsjf =
(CCNCFunctions.DestroyJobFunction) fn;
+ ncs.getWorkQueue().schedule(new DestroyJobWork(ncs,
dsjf.getJobId()));
+ return;
case STATE_DUMP_REQUEST:
final CCNCFunctions.StateDumpRequestFunction dsrf =
(StateDumpRequestFunction) fn;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
new file mode 100644
index 0000000..8043edb
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * destroy a permanently distributed job
+ *
+ */
+public class DestroyJobWork extends AbstractWork {
+
+ private final NodeControllerService ncs;
+ private final JobId jobId;
+
+ public DestroyJobWork(NodeControllerService ncs, JobId jobId) {
+ this.ncs = ncs;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public void run() {
+ Map<JobId, ActivityClusterGraph> acgMap =
ncs.getActivityClusterGraphMap();
+ ActivityClusterGraph acg = acgMap.get(jobId);
+ if (acg == null) {
+ throw new RuntimeException("Trying to destroy a job that was never
distributed!");
+ }
+ acgMap.remove(jobId);
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
new file mode 100644
index 0000000..31e2542
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * pre-distribute a job that can be executed later
+ *
+ */
+public class DistributeJobWork extends AbstractWork {
+
+ private final NodeControllerService ncs;
+ private final byte[] acgBytes;
+ private final JobId jobId;
+
+ public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[]
acgBytes) {
+ this.ncs = ncs;
+ this.jobId = jobId;
+ this.acgBytes = acgBytes;
+ }
+
+ @Override
+ public void run() {
+ Map<JobId, ActivityClusterGraph> acgMap =
ncs.getActivityClusterGraphMap();
+ ActivityClusterGraph acg = acgMap.get(jobId);
+ if (acg != null) {
+ throw new RuntimeException("Trying to distribute a job that has
already been distributed!");
+ }
+ try {
+ acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes,
null, ncs.getApplicationContext());
+ } catch (HyracksException e) {
+ throw new RuntimeException(e);
+ }
+ acgMap.put(jobId, acg);
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index d27caf2..93b8e65 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -193,10 +193,6 @@
throw new HyracksException("Joblet was not found. This job
was most likely aborted.");
}
acg = (ActivityClusterGraph)
DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
- if (flags.contains(JobFlag.STORE_JOB)) {
- //TODO: Right now the map is append-only
- acgMap.put(jobId, acg);
- }
}
ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
jobletMap.put(jobId, ji);
--
To view, visit https://asterix-gerrit.ics.uci.edu/1377
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I59c3422d5c1ab7756a6a4685ac527dfe50434954
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>