Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1377

Change subject: ASTERIXDB-1747 Implemented full lifecycle capabilities for 
distributed jobs
......................................................................

ASTERIXDB-1747 Implemented full lifecycle capabilities for distributed jobs

Added distribute and destroy functionality
Removed serialization and bytes when running pred-distributed jobs
Cleaned up methods

Change-Id: I59c3422d5c1ab7756a6a4685ac527dfe50434954
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
20 files changed, 534 insertions(+), 46 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/77/1377/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 780e205..aa292f6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -36,6 +36,8 @@
         GET_JOB_STATUS,
         GET_JOB_INFO,
         START_JOB,
+        DISTRIBUTE_JOB,
+        DESTROY_JOB,
         GET_DATASET_DIRECTORY_SERIVICE_INFO,
         GET_DATASET_RESULT_STATUS,
         GET_DATASET_RESULT_LOCATIONS,
@@ -101,6 +103,44 @@
         }
     }
 
+    public static class DistributeJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final byte[] acggfBytes;
+
+        public DistributeJobFunction(byte[] acggfBytes) {
+            this.acggfBytes = acggfBytes;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DISTRIBUTE_JOB;
+        }
+
+        public byte[] getACGGFBytes() {
+            return acggfBytes;
+        }
+    }
+
+    public static class DestroyJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public DestroyJobFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
     public static class StartJobFunction extends Function {
         private static final long serialVersionUID = 1L;
 
@@ -116,8 +156,8 @@
             this.jobId = jobId;
         }
 
-        public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, 
JobId jobId) {
-            this(null, acggfBytes, jobFlags, jobId);
+        public StartJobFunction(JobId jobId) {
+            this(null, null, null, jobId);
         }
 
         public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index c049007..8e7affb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -69,9 +69,9 @@
     }
 
     @Override
-    public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId 
jobId) throws Exception {
+    public JobId startJob(JobId jobId) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf =
-                new 
HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags, jobId);
+                new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -83,6 +83,20 @@
     }
 
     @Override
+    public JobId distributeJob(byte[] acggfBytes) throws Exception {
+        HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
+                new 
HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
+        return (JobId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public JobId destroyJob(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
+                new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
+        return (JobId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
         HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction 
gddsf =
                 new 
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index eb92c37..10e45d8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -103,19 +103,28 @@
     }
 
     @Override
-    public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, 
JobId jobId) throws Exception {
+    public JobId distributeJob(JobSpecification jobSpec) throws Exception {
         JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
                 new 
JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
-        return startJob(jsacggf, jobFlags, jobId);
+        return distributeJob(jsacggf);
+    }
+
+    @Override
+    public JobId destroyJob(JobId jobId) throws Exception {
+        return hci.destroyJob(jobId);
+    }
+
+    @Override
+    public JobId startJob(JobId jobId) throws Exception {
+        return hci.startJob(jobId);
     }
 
     public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, 
EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
-    public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, 
EnumSet<JobFlag> jobFlags, JobId jobId)
-            throws Exception {
-        return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags, 
jobId);
+    public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf) 
throws Exception {
+        return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
     }
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 031896e..dc51324 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -91,11 +91,27 @@
      *            Job Specification
      * @param jobFlags
      *            Flags
-     * @param jobId
-     *            Used to run a pre-distributed job by id (the same value will 
be returned)
      * @throws Exception
      */
-    public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, 
JobId jobId) throws Exception;
+    public JobId distributeJob(JobSpecification jobSpec) throws Exception;
+
+    /**
+     * Destroy the distributed graph for a pre-distributed job
+     *
+     * @param jobId
+     *            The id of the predistributed job
+     * @throws Exception
+     */
+    public JobId destroyJob(JobId jobId) throws Exception;
+
+    /**
+     * Used to run a pre-distributed job by id (the same JobId will be 
returned)
+     *
+     * @param jobId
+     *            The id of the predistributed job
+     * @throws Exception
+     */
+    public JobId startJob(JobId jobId) throws Exception;
 
     /**
      * Start the specified Job.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 39063c6..f7995d7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -38,7 +38,11 @@
 
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws 
Exception;
 
-    public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId 
jobId) throws Exception;
+    public JobId startJob(JobId jobId) throws Exception;
+
+    public JobId distributeJob(byte[] acggfBytes) throws Exception;
+
+    public JobId destroyJob(JobId jobId) throws Exception;
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index 1656c51..a33c6c9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,6 +19,5 @@
 package org.apache.hyracks.api.job;
 
 public enum JobFlag {
-    PROFILE_RUNTIME,
-    STORE_JOB
+    PROFILE_RUNTIME
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 26beb63..67f75ff 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -31,6 +31,8 @@
 import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
+import org.apache.hyracks.control.cc.work.DestroyJobWork;
+import org.apache.hyracks.control.cc.work.DistributeJobWork;
 import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import org.apache.hyracks.control.cc.work.GetJobInfoWork;
 import org.apache.hyracks.control.cc.work.GetJobStatusWork;
@@ -81,18 +83,35 @@
                 ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs, 
gjif.getJobId(),
                         new IPCResponder<JobInfo>(handle, mid)));
                 break;
+            case DISTRIBUTE_JOB:
+                HyracksClientInterfaceFunctions.DistributeJobFunction djf =
+                        
(HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
+                ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, 
djf.getACGGFBytes(), jobIdFactory.create(),
+                        new IPCResponder<JobId>(handle, mid)));
+                break;
+            case DESTROY_JOB:
+                HyracksClientInterfaceFunctions.DestroyJobFunction dsjf =
+                        (HyracksClientInterfaceFunctions.DestroyJobFunction) 
fn;
+                ccs.getWorkQueue()
+                        .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new 
IPCResponder<JobId>(handle, mid)));
+                break;
             case START_JOB:
                 HyracksClientInterfaceFunctions.StartJobFunction sjf =
                         (HyracksClientInterfaceFunctions.StartJobFunction) fn;
                 JobId jobId = sjf.getJobId();
                 byte[] acggfBytes = null;
+                boolean predistributed = false;
                 if (jobId == null) {
+                    //The job is new
                     jobId = jobIdFactory.create();
+                    acggfBytes = sjf.getACGGFBytes();
                 }
-                //TODO: only send these when the jobId is null
-                acggfBytes = sjf.getACGGFBytes();
+                else {
+                    //The job has been predistributed. We don't need to send 
an ActivityClusterGraph
+                    predistributed = true;
+                }
                 ccs.getWorkQueue().schedule(new JobStartWork(ccs, 
sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
-                        jobId, new IPCResponder<JobId>(handle, mid)));
+                        jobId, new IPCResponder<JobId>(handle, mid), 
predistributed));
                 break;
             case GET_DATASET_DIRECTORY_SERIVICE_INFO:
                 ccs.getWorkQueue().schedule(new 
GetDatasetDirectoryServiceInfoWork(ccs,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 5fdcede..9a4078a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -25,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,9 +41,11 @@
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
@@ -101,6 +104,10 @@
 
     private final Map<JobId, List<Exception>> runMapHistory;
 
+    private final Map<JobId, ActivityClusterGraph> activityClusterGraphMap;
+
+    private final Map<JobId, Set<Constraint>> 
activityClusterGraphConstraintsMap;
+
     private final WorkQueue workQueue;
 
     private ExecutorService executor;
@@ -138,6 +145,8 @@
                 new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this);
         activeRunMap = new HashMap<>();
+        activityClusterGraphMap = new Hashtable<>();
+        activityClusterGraphConstraintsMap = new Hashtable<>();
         runMapArchive = new LinkedHashMap<JobId, JobRun>() {
             private static final long serialVersionUID = 1L;
 
@@ -313,6 +322,14 @@
         return runMapHistory;
     }
 
+    public Map<JobId, ActivityClusterGraph> getActivityClusterGraphMap() {
+        return activityClusterGraphMap;
+    }
+
+    public Map<JobId, Set<Constraint>> getActivityClusterGraphConstraintsMap() 
{
+        return activityClusterGraphConstraintsMap;
+    }
+
     public Map<InetAddress, Set<String>> getIpAddressNodeNameMap() {
         return ipAddressNodeNameMap;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index dbe4202..b788d10 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,7 +20,6 @@
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,10 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
+import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -53,17 +49,20 @@
 import org.apache.hyracks.control.cc.scheduler.JobScheduler;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.utils.ExceptionUtils;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public class JobRun implements IJobStatusConditionVariable {
     private final DeploymentId deploymentId;
 
     private final JobId jobId;
 
-    private final IActivityClusterGraphGenerator acgg;
+    private IActivityClusterGraphGenerator acgg;
 
-    private final ActivityClusterGraph acg;
+    private ActivityClusterGraph acg;
 
-    private final JobScheduler scheduler;
+    private JobScheduler scheduler;
 
     private final EnumSet<JobFlag> jobFlags;
 
@@ -95,13 +94,9 @@
 
     private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
 
-    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, 
JobId jobId,
-            IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+    private JobRun(ClusterControllerService ccs, DeploymentId deploymentId, 
JobId jobId, EnumSet<JobFlag> jobFlags) {
         this.deploymentId = deploymentId;
         this.jobId = jobId;
-        this.acgg = acgg;
-        this.acg = acgg.initialize();
-        this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
         this.jobFlags = jobFlags;
         activityClusterPlanMap = new HashMap<ActivityClusterId, 
ActivityClusterPlan>();
         pmm = new PartitionMatchMaker();
@@ -111,6 +106,32 @@
         connectorPolicyMap = new HashMap<ConnectorDescriptorId, 
IConnectorPolicy>();
         operatorLocations = new HashMap<OperatorDescriptorId, Map<Integer, 
String>>();
         createTime = System.currentTimeMillis();
+
+    }
+
+    //Run a Pre-distributed job by passing the ActivityClusterGraph
+    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, 
JobId jobId)
+            throws HyracksException {
+        this(ccs, deploymentId, jobId, EnumSet.noneOf(JobFlag.class));
+        Map<JobId, ActivityClusterGraph> acgMap = 
ccs.getActivityClusterGraphMap();
+        Map<JobId, Set<Constraint>> acgConstaintsMap = 
ccs.getActivityClusterGraphConstraintsMap();
+        ActivityClusterGraph entry = acgMap.get(jobId);
+        Set<Constraint> constaints = acgConstaintsMap.get(jobId);
+        if (entry == null || constaints == null) {
+            throw new HyracksException("Trying to run a pre-destributed job 
with no cluster map");
+        }
+        this.acg = entry;
+        this.acgg = null;
+        this.scheduler = new JobScheduler(ccs, this, constaints, true);
+    }
+
+    //Run a new job by creating an ActivityClusterGraph
+    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, 
JobId jobId,
+            IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+        this(ccs, deploymentId, jobId, jobFlags);
+        this.acgg = acgg;
+        this.acg = acgg.initialize();
+        this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints(), 
false);
     }
 
     public DeploymentId getDeploymentId() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
index ab026eb..db24d7d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
@@ -31,9 +31,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.constraints.Constraint;
 import 
org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
@@ -66,6 +63,8 @@
 import org.apache.hyracks.control.cc.work.JobCleanupWork;
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public class JobScheduler {
     private static final Logger LOGGER = 
Logger.getLogger(JobScheduler.class.getName());
@@ -76,13 +75,17 @@
 
     private final PartitionConstraintSolver solver;
 
+    private final boolean predistributed;
+
     private final Map<PartitionId, TaskCluster> 
partitionProducingTaskClusterMap;
 
     private final Set<TaskCluster> inProgressTaskClusters;
 
-    public JobScheduler(ClusterControllerService ccs, JobRun jobRun, 
Collection<Constraint> constraints) {
+    public JobScheduler(ClusterControllerService ccs, JobRun jobRun, 
Collection<Constraint> constraints,
+            boolean predistributed) {
         this.ccs = ccs;
         this.jobRun = jobRun;
+        this.predistributed = predistributed;
         solver = new PartitionConstraintSolver();
         partitionProducingTaskClusterMap = new HashMap<PartitionId, 
TaskCluster>();
         inProgressTaskClusters = new HashSet<TaskCluster>();
@@ -465,7 +468,6 @@
         final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = 
new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
                 jobRun.getConnectorPolicyMap());
         try {
-            byte[] acgBytes = JavaSerializationUtils.serialize(acg);
             for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : 
taskAttemptMap.entrySet()) {
                 String nodeId = entry.getKey();
                 final List<TaskAttemptDescriptor> taskDescriptors = 
entry.getValue();
@@ -476,6 +478,10 @@
                     if (LOGGER.isLoggable(Level.FINE)) {
                         LOGGER.fine("Starting: " + taskDescriptors + " at " + 
entry.getKey());
                     }
+                    byte[] acgBytes = null;
+                    if (!predistributed && changed) {
+                        acgBytes = JavaSerializationUtils.serialize(acg);
+                    }
                     byte[] jagBytes = changed ? acgBytes : null;
                     node.getNodeController().startTasks(deploymentId, jobId, 
jagBytes, taskDescriptors,
                             connectorPolicies, jobRun.getFlags());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
new file mode 100644
index 0000000..d565511
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DestroyJobWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+    private final JobId jobId;
+    private final IResultCallback<JobId> callback;
+
+    public DestroyJobWork(ClusterControllerService ccs, JobId jobId, 
IResultCallback<JobId> callback) {
+        this.jobId = jobId;
+        this.ccs = ccs;
+        this.callback = callback;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        try {
+            Map<JobId, ActivityClusterGraph> acgMap = 
ccs.getActivityClusterGraphMap();
+            ActivityClusterGraph acg = acgMap.get(jobId);
+            if (acg == null) {
+                throw new RuntimeException("Trying to destroy a job that was 
never distributed!");
+            }
+            acgMap.remove(jobId);
+            for (NodeControllerState node : ccs.getNodeMap().values()) {
+                node.getNodeController().destroyJob(jobId);
+            }
+            callback.setValue(jobId);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
new file mode 100644
index 0000000..8ec315c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DistributeJobWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+    private final byte[] acggfBytes;
+    private final JobId jobId;
+    private final IResultCallback<JobId> callback;
+
+    public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, 
JobId jobId,
+            IResultCallback<JobId> callback) {
+        this.jobId = jobId;
+        this.ccs = ccs;
+        this.acggfBytes = acggfBytes;
+        this.callback = callback;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        try {
+            final CCApplicationContext appCtx = ccs.getApplicationContext();
+            Map<JobId, ActivityClusterGraph> acgMap = 
ccs.getActivityClusterGraphMap();
+            Map<JobId, Set<Constraint>> acgConstaintsMap = 
ccs.getActivityClusterGraphConstraintsMap();
+            ActivityClusterGraph entry = acgMap.get(jobId);
+            Set<Constraint> constaints = acgConstaintsMap.get(jobId);
+            if (entry != null || constaints != null) {
+                throw new HyracksException("Trying to distribute a job with a 
duplicate jobId");
+            }
+            IActivityClusterGraphGeneratorFactory acggf =
+                    (IActivityClusterGraphGeneratorFactory) 
DeploymentUtils.deserialize(acggfBytes, null, appCtx);
+            IActivityClusterGraphGenerator acgg =
+                    acggf.createActivityClusterGraphGenerator(jobId, appCtx, 
EnumSet.noneOf(JobFlag.class));
+            ActivityClusterGraph acg = acgg.initialize();
+            acgMap.put(jobId, acg);
+            acgConstaintsMap.put(jobId, acgg.getConstraints());
+
+            appCtx.notifyJobCreation(jobId, acggf);
+
+            byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+            for (NodeControllerState node : ccs.getNodeMap().values()) {
+                node.getNodeController().distributeJob(jobId, acgBytes);
+            }
+
+            callback.setValue(jobId);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index e7844e9..6b7f26f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -41,29 +41,42 @@
     private final DeploymentId deploymentId;
     private final JobId jobId;
     private final IResultCallback<JobId> callback;
+    private final boolean predestributed;
 
     public JobStartWork(ClusterControllerService ccs, DeploymentId 
deploymentId, byte[] acggfBytes,
-            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> 
callback) {
+            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> 
callback, boolean predestributed) {
         this.deploymentId = deploymentId;
         this.jobId = jobId;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
         this.jobFlags = jobFlags;
         this.callback = callback;
+        this.predestributed = predestributed;
     }
 
     @Override
     protected void doRun() throws Exception {
         try {
             final CCApplicationContext appCtx = ccs.getApplicationContext();
-            IActivityClusterGraphGeneratorFactory acggf = 
(IActivityClusterGraphGeneratorFactory) DeploymentUtils
-                    .deserialize(acggfBytes, deploymentId, appCtx);
-            IActivityClusterGraphGenerator acgg = 
acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
-            JobRun run = new JobRun(ccs, deploymentId, jobId, acgg, jobFlags);
+            JobRun run;
+            IActivityClusterGraphGeneratorFactory acggf = null;
+            if (!predestributed) {
+                //Need to create the ActivityClusterGraph
+                acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
+                        .deserialize(acggfBytes, deploymentId, appCtx);
+                IActivityClusterGraphGenerator acgg =
+                        acggf.createActivityClusterGraphGenerator(jobId, 
appCtx, jobFlags);
+                run = new JobRun(ccs, deploymentId, jobId, acgg, jobFlags);
+            } else {
+                //ActivityClusterGraph has already been distributed
+                run = new JobRun(ccs, deploymentId, jobId);
+            }
             run.setStatus(JobStatus.INITIALIZED, null);
             run.setStartTime(System.currentTimeMillis());
             ccs.getActiveRunMap().put(jobId, run);
-            appCtx.notifyJobCreation(jobId, acggf);
+            if (!predestributed) {
+                appCtx.notifyJobCreation(jobId, acggf);
+            }
             run.setStatus(JobStatus.RUNNING, null);
             try {
                 run.getScheduler().startJob();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index dff5827..aaea856 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -49,6 +49,10 @@
 
     public void undeployBinary(DeploymentId deploymentId) throws Exception;
 
+    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception;
+
+    public void destroyJob(JobId jobId) throws Exception;
+
     public void dumpState(String stateDumpId) throws Exception;
 
     public void shutdown(boolean terminateNCService) throws Exception;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index aa9a4fe..18e1b79 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -98,6 +98,9 @@
         SHUTDOWN_REQUEST,
         SHUTDOWN_RESPONSE,
 
+        DISTRIBUTE_JOB,
+        DESTROY_JOB,
+
         STATE_DUMP_REQUEST,
         STATE_DUMP_RESPONSE,
 
@@ -668,6 +671,51 @@
         }
     }
 
+    public static class DistributeJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        private final byte[] acgBytes;
+
+        public DistributeJobFunction(JobId jobId, byte[] acgBytes) {
+            this.jobId = jobId;
+            this.acgBytes = acgBytes;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DISTRIBUTE_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public byte[] getacgBytes() {
+            return acgBytes;
+        }
+    }
+
+    public static class DestroyJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public DestroyJobFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
     public static class StartTasksFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index c3376e6..aa96bdb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -84,6 +84,18 @@
     }
 
     @Override
+    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
+        CCNCFunctions.DistributeJobFunction fn = new 
CCNCFunctions.DistributeJobFunction(jobId, planBytes);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void destroyJob(JobId jobId) throws Exception {
+        CCNCFunctions.DestroyJobFunction fn = new 
CCNCFunctions.DestroyJobFunction(jobId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public void dumpState(String stateDumpId) throws Exception {
         CCNCFunctions.StateDumpRequestFunction dsf = new 
CCNCFunctions.StateDumpRequestFunction(stateDumpId);
         ipcHandle.send(-1, dsf, null);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 93ccaa4..9a58cc5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -26,6 +26,8 @@
 import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 import org.apache.hyracks.control.nc.work.DeployBinaryWork;
+import org.apache.hyracks.control.nc.work.DestroyJobWork;
+import org.apache.hyracks.control.nc.work.DistributeJobWork;
 import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import org.apache.hyracks.control.nc.work.StartTasksWork;
 import org.apache.hyracks.control.nc.work.StateDumpWork;
@@ -98,6 +100,16 @@
                 CCNCFunctions.UnDeployBinaryFunction ndbf = 
(CCNCFunctions.UnDeployBinaryFunction) fn;
                 ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, 
ndbf.getDeploymentId()));
                 return;
+                
+            case DISTRIBUTE_JOB:
+                CCNCFunctions.DistributeJobFunction djf = 
(CCNCFunctions.DistributeJobFunction) fn;
+                ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, 
djf.getJobId(), djf.getacgBytes()));
+                return;
+
+            case DESTROY_JOB:
+                CCNCFunctions.DestroyJobFunction dsjf = 
(CCNCFunctions.DestroyJobFunction) fn;
+                ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, 
dsjf.getJobId()));
+                return;
 
             case STATE_DUMP_REQUEST:
                 final CCNCFunctions.StateDumpRequestFunction dsrf = 
(StateDumpRequestFunction) fn;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
new file mode 100644
index 0000000..8043edb
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * destroy a permanently distributed job
+ *
+ */
+public class DestroyJobWork extends AbstractWork {
+
+    private final NodeControllerService ncs;
+    private final JobId jobId;
+
+    public DestroyJobWork(NodeControllerService ncs, JobId jobId) {
+        this.ncs = ncs;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public void run() {
+        Map<JobId, ActivityClusterGraph> acgMap = 
ncs.getActivityClusterGraphMap();
+        ActivityClusterGraph acg = acgMap.get(jobId);
+        if (acg == null) {
+            throw new RuntimeException("Trying to destroy a job that was never 
distributed!");
+        }
+        acgMap.remove(jobId);
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
new file mode 100644
index 0000000..31e2542
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * pre-distribute a job that can be executed later
+ *
+ */
+public class DistributeJobWork extends AbstractWork {
+
+    private final NodeControllerService ncs;
+    private final byte[] acgBytes;
+    private final JobId jobId;
+
+    public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[] 
acgBytes) {
+        this.ncs = ncs;
+        this.jobId = jobId;
+        this.acgBytes = acgBytes;
+    }
+
+    @Override
+    public void run() {
+        Map<JobId, ActivityClusterGraph> acgMap = 
ncs.getActivityClusterGraphMap();
+        ActivityClusterGraph acg = acgMap.get(jobId);
+        if (acg != null) {
+            throw new RuntimeException("Trying to distribute a job that has 
already been distributed!");
+        }
+        try {
+            acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, 
null, ncs.getApplicationContext());
+        } catch (HyracksException e) {
+            throw new RuntimeException(e);
+        }
+        acgMap.put(jobId, acg);
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index d27caf2..93b8e65 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -193,10 +193,6 @@
                     throw new HyracksException("Joblet was not found. This job 
was most likely aborted.");
                 }
                 acg = (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
-                if (flags.contains(JobFlag.STORE_JOB)) {
-                    //TODO: Right now the map is append-only
-                    acgMap.put(jobId, acg);
-                }
             }
             ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
             jobletMap.put(jobId, ji);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1377
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I59c3422d5c1ab7756a6a4685ac527dfe50434954
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>

Reply via email to