Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2045

Change subject: Long Running Jobs Changes
......................................................................

Long Running Jobs Changes

Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
A 
asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/not-is-missing-constant.sqlpp
A 
asterixdb/asterix-app/src/test/resources/optimizerts/results/not-is-missing-constant.plan
A 
asterixdb/asterix-app/src/test/resources/optimizerts/results_parser_sqlpp/not-is-missing-constant.ast
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
A 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
A 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
M 
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
49 files changed, 698 insertions(+), 270 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/45/2045/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index d94a045..f9245a9 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -218,6 +218,7 @@
 
         condPushDownAndJoinInference.add(new RemoveRedundantVariablesRule());
         condPushDownAndJoinInference.add(new AsterixInlineVariablesRule());
+        condPushDownAndJoinInference.add(new RemoveRedundantSelectRule());
         condPushDownAndJoinInference.add(new 
RemoveUnusedAssignAndAggregateRule());
 
         condPushDownAndJoinInference.add(new 
FactorRedundantGroupAndDecorVarsRule());
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
index fe9e49e..a27b143 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
@@ -19,15 +19,17 @@
 
 package org.apache.asterix.optimizer.rules;
 
-import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
@@ -57,7 +59,7 @@
         }
         SelectOperator select = (SelectOperator) op;
         ILogicalExpression cond = select.getCondition().getValue();
-        if (alwaysHold(cond)) {
+        if (alwaysHold(cond, context)) {
             opRef.setValue(select.getInputs().get(0).getValue());
             return true;
         }
@@ -70,13 +72,31 @@
      * @param cond
      * @return true if the condition always holds; false otherwise.
      */
-    private boolean alwaysHold(ILogicalExpression cond) {
+    private boolean alwaysHold(ILogicalExpression cond, IOptimizationContext 
context) {
         if (cond.equals(ConstantExpression.TRUE)) {
             return true;
         }
         if (cond.equals(new ConstantExpression(new 
AsterixConstantValue(ABoolean.TRUE)))) {
             return true;
         }
+        /* is-missing is always false for constant values
+         * so not(is-missing(CONST)) is always true
+         * this situation arises when a left outer join with an "is-missing" 
filtering function call
+         * is converted to an inner join. See PushSelectIntoJoinRule
+         */
+        if (cond.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL
+                && ((AbstractFunctionCallExpression) 
cond).getFunctionIdentifier() == BuiltinFunctions.NOT) {
+            ILogicalExpression subExpr = ((AbstractFunctionCallExpression) 
cond).getArguments().get(0).getValue();
+            if (subExpr.getExpressionTag() == 
LogicalExpressionTag.FUNCTION_CALL
+                    && ((AbstractFunctionCallExpression) subExpr)
+                            .getFunctionIdentifier() == 
BuiltinFunctions.IS_MISSING) {
+                subExpr = ((AbstractFunctionCallExpression) 
subExpr).getArguments().get(0).getValue();
+                if (subExpr.getExpressionTag() == 
LogicalExpressionTag.CONSTANT) {
+                    return true;
+                }
+            }
+
+        }
         return false;
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 29135d3..bba42bb 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -731,7 +731,7 @@
         return DatasetUtil.createNodeGroupForNewDataset(dataverseName, 
datasetName, selectedNodes, metadataProvider);
     }
 
-    protected void handleCreateIndexStatement(MetadataProvider 
metadataProvider, Statement stmt,
+    public void handleCreateIndexStatement(MetadataProvider metadataProvider, 
Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtCreateIndex.getDataverseName());
@@ -1623,7 +1623,7 @@
         CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
         String dataverse = 
getActiveDataverseName(cfs.getSignature().getNamespace());
         cfs.getSignature().setNamespace(dataverse);
-        String functionName = cfs.getaAterixFunction().getName();
+        String functionName = cfs.getSignature().getName();
 
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1634,7 +1634,9 @@
             if (dv == null) {
                 throw new AlgebricksException("There is no dataverse with this 
name " + dataverse + ".");
             }
-            Function function = new Function(dataverse, functionName, 
cfs.getaAterixFunction().getArity(),
+
+            // If the function body contains function calls, theirs reference 
count won't be increased.
+            Function function = new Function(dataverse, functionName, 
cfs.getSignature().getArity(),
                     cfs.getParamList(), Function.RETURNTYPE_VOID, 
cfs.getFunctionBody(), Function.LANGUAGE_AQL,
                     FunctionKind.SCALAR.toString());
             MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/not-is-missing-constant.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/not-is-missing-constant.sqlpp
new file mode 100644
index 0000000..2bdf1d9
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/not-is-missing-constant.sqlpp
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ //This test ensure that not(is-missing(CONST)) is handled properly
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type userLocation as {
+  userId: int,
+  roomNumber: int
+};
+
+create type result as {
+  id:uuid
+};
+
+create type subscriptionType as {
+  subscriptionId:uuid,
+  param0:int
+};
+
+create dataset roomRecordsResults(result)
+primary key id autogenerated;
+
+create dataset roomRecordsSubscriptions(subscriptionType)
+primary key subscriptionId autogenerated;
+
+create dataset UserLocations(userLocation)
+primary key userId;
+
+create function RoomOccupants(room) {
+  (select location.userId
+  from UserLocations location
+  where location.roomNumber = room)
+};
+
+use channels;
+SET inline_with "false";
+insert into channels.roomRecordsResults as a (
+  with channelExecutionTime as current_datetime()
+  select result, channelExecutionTime, sub.subscriptionId as 
subscriptionId,current_datetime() as deliveryTime
+  from channels.roomRecordsSubscriptions sub,
+  channels.RoomOccupants(sub.param0) result 
+) returning a;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/not-is-missing-constant.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/not-is-missing-constant.plan
new file mode 100644
index 0000000..709b2b0
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/not-is-missing-constant.plan
@@ -0,0 +1,32 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- COMMIT  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- INSERT_DELETE  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$19]  |PARTITIONED|
+                -- ASSIGN  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- HYBRID_HASH_JOIN [$$26][$$25]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$26]  
|PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            -- BROADCAST_EXCHANGE  
|PARTITIONED|
+                                              -- ASSIGN  |UNPARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  
|UNPARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$25]  
|PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results_parser_sqlpp/not-is-missing-constant.ast
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results_parser_sqlpp/not-is-missing-constant.ast
new file mode 100644
index 0000000..847539c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results_parser_sqlpp/not-is-missing-constant.ast
@@ -0,0 +1,23 @@
+DataverseUse channels
+TypeDecl userLocation [
+  open RecordType {
+    userId : int64,
+    roomNumber : int64
+  }
+]
+TypeDecl result [
+  open RecordType {
+    id : uuid
+  }
+]
+TypeDecl subscriptionType [
+  open RecordType {
+    subscriptionId : uuid,
+    param0 : int64
+  }
+]
+DatasetDecl roomRecordsResults(result) partitioned by [[id]] [autogenerated]
+DatasetDecl roomRecordsSubscriptions(subscriptionType) partitioned by 
[[subscriptionId]] [autogenerated]
+DatasetDecl UserLocations(userLocation) partitioned by [[userId]]
+DataverseUse channels
+Set inline_with=false
\ No newline at end of file
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
index 6d74957..2b7cad6 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
@@ -34,10 +34,6 @@
     private final boolean ifNotExists;
     private final List<String> paramList;
 
-    public FunctionSignature getaAterixFunction() {
-        return signature;
-    }
-
     public String getFunctionBody() {
         return functionBody;
     }
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 863847b..7addf9d 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -864,6 +864,9 @@
     public static final FunctionIdentifier EXTERNAL_LOOKUP = new 
FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "external-lookup", FunctionIdentifier.VARARGS);
 
+    public static final FunctionIdentifier GET_JOB_PARAMETER =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"get-job-param", 1);
+
     public static final FunctionIdentifier META = new 
FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta",
             FunctionIdentifier.VARARGS);
     public static final FunctionIdentifier META_KEY = new 
FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta-key",
@@ -1274,6 +1277,9 @@
         // external lookup
         addPrivateFunction(EXTERNAL_LOOKUP, AnyTypeComputer.INSTANCE, false);
 
+        // get job parameter
+        addFunction(GET_JOB_PARAMETER, AnyTypeComputer.INSTANCE, false);
+
         // unnesting function
         addPrivateFunction(SCAN_COLLECTION, 
CollectionMemberResultType.INSTANCE, true);
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
index 1f9909c..4a421cc 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
@@ -22,8 +22,8 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -48,7 +48,7 @@
     final GrowableArray resultArray = new GrowableArray();
     final UTF8StringBuilder resultBuilder = new UTF8StringBuilder();
     private final ArrayBackedValueStorage resultStorage = new 
ArrayBackedValueStorage();
-    private final DataOutput dataOutput = resultStorage.getDataOutput();
+    protected final DataOutput dataOutput = resultStorage.getDataOutput();
     private final FunctionIdentifier funcID;
 
     AbstractUnaryStringStringEval(IHyracksTaskContext context, 
IScalarEvaluatorFactory argEvalFactory,
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
new file mode 100644
index 0000000..9d73dcc
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.evaluators.functions;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import 
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class GetJobParameterDescriptor extends 
AbstractScalarFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new 
IFunctionDescriptorFactory() {
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new GetJobParameterDescriptor();
+        }
+    };
+
+    @Override
+    public IScalarEvaluatorFactory 
createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext 
ctx) throws HyracksDataException {
+                return new AbstractUnaryStringStringEval(ctx, args[0],
+                        GetJobParameterDescriptor.this.getIdentifier()) {
+                    private byte[] result;
+
+                    @Override
+                    protected void process(UTF8StringPointable inputString, 
IPointable resultPointable)
+                            throws IOException {
+                        result = 
ctx.getJobParameter(inputString.getByteArray(), inputString.getStartOffset(),
+                                inputString.getLength());
+                    }
+
+                    @Override
+                    void writeResult(IPointable resultPointable) throws 
IOException {
+                        resultPointable.set(result, 0, result.length);
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.GET_JOB_PARAMETER;
+    }
+
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index c02732f..d831050 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -158,6 +158,7 @@
 import 
org.apache.asterix.runtime.evaluators.functions.FullTextContainsDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.FullTextContainsWithoutOptionDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.GetItemDescriptor;
+import 
org.apache.asterix.runtime.evaluators.functions.GetJobParameterDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.GramTokensDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.HashedGramTokensDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.HashedWordTokensDescriptor;
@@ -426,6 +427,9 @@
         // Inject failure function
         temp.add(InjectFailureDescriptor.FACTORY);
 
+        // Get Job Parameter function
+        temp.add(GetJobParameterDescriptor.FACTORY);
+
         // Switch case
         temp.add(SwitchCaseDescriptor.FACTORY);
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 95479c1..d183363 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -22,6 +22,8 @@
 import java.net.URL;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
 import org.apache.hyracks.api.dataset.ResultSetId;
@@ -148,10 +150,10 @@
     public static class DestroyJobFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final long predestributedId;
 
-        public DestroyJobFunction(JobId jobId) {
-            this.jobId = jobId;
+        public DestroyJobFunction(long predestributedId) {
+            this.predestributedId = predestributedId;
         }
 
         @Override
@@ -159,8 +161,8 @@
             return FunctionId.DESTROY_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public long getPredistributedId() {
+            return predestributedId;
         }
     }
 
@@ -168,27 +170,30 @@
         private static final long serialVersionUID = 1L;
 
         private final byte[] acggfBytes;
-        private final EnumSet<JobFlag> jobFlags;
+        private final Set<JobFlag> jobFlags;
         private final DeploymentId deploymentId;
-        private final JobId jobId;
+        private final long predistributedId;
+        private final Map<byte[], byte[]> jobParameters;
 
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags, JobId jobId) {
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
Set<JobFlag> jobFlags,
+                long predistributedId, Map<byte[], byte[]> jobParameters) {
             this.acggfBytes = acggfBytes;
             this.jobFlags = jobFlags;
             this.deploymentId = deploymentId;
-            this.jobId = jobId;
+            this.predistributedId = predistributedId;
+            this.jobParameters = jobParameters;
         }
 
-        public StartJobFunction(JobId jobId) {
-            this(null, null, EnumSet.noneOf(JobFlag.class), jobId);
+        public StartJobFunction(long jobId, Map<byte[], byte[]> jobParameters) 
{
+            this(null, null, EnumSet.noneOf(JobFlag.class), jobId, 
jobParameters);
         }
 
-        public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
-            this(null, acggfBytes, jobFlags, null);
+        public StartJobFunction(byte[] acggfBytes, Set<JobFlag> jobFlags) {
+            this(null, acggfBytes, jobFlags, -1, null);
         }
 
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags) {
-            this(deploymentId, acggfBytes, jobFlags, null);
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
Set<JobFlag> jobFlags) {
+            this(deploymentId, acggfBytes, jobFlags, -1, null);
         }
 
         @Override
@@ -196,15 +201,19 @@
             return FunctionId.START_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public Map<byte[], byte[]> getJobParameters() {
+            return jobParameters;
+        }
+
+        public long getpredistributedId() {
+            return predistributedId;
         }
 
         public byte[] getACGGFBytes() {
             return acggfBytes;
         }
 
-        public EnumSet<JobFlag> getJobFlags() {
+        public Set<JobFlag> getJobFlags() {
             return jobFlags;
         }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0142c7d..bdb270c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -76,9 +76,9 @@
     }
 
     @Override
-    public JobId startJob(JobId jobId) throws Exception {
+    public JobId startJob(long predestributedId, Map<byte[], byte[]> 
jobParameters) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf =
-                new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
+                new 
HyracksClientInterfaceFunctions.StartJobFunction(predestributedId, 
jobParameters);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -90,17 +90,17 @@
     }
 
     @Override
-    public JobId distributeJob(byte[] acggfBytes) throws Exception {
+    public long distributeJob(byte[] acggfBytes) throws Exception {
         HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
                 new 
HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
-        return (JobId) rpci.call(ipcHandle, sjf);
+        return (long) rpci.call(ipcHandle, sjf);
     }
 
     @Override
-    public JobId destroyJob(JobId jobId) throws Exception {
+    public long destroyJob(long predestributedId) throws Exception {
         HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
-                new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
-        return (JobId) rpci.call(ipcHandle, sjf);
+                new 
HyracksClientInterfaceFunctions.DestroyJobFunction(predestributedId);
+        return (long) rpci.call(ipcHandle, sjf);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 75cbf61..ae7960c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -108,27 +108,27 @@
     }
 
     @Override
-    public JobId distributeJob(JobSpecification jobSpec) throws Exception {
-        IActivityClusterGraphGeneratorFactory jsacggf =
+    public long distributeJob(JobSpecification jobSpec) throws Exception {
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
                 new 
JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
         return distributeJob(jsacggf);
     }
 
     @Override
-    public JobId destroyJob(JobId jobId) throws Exception {
-        return hci.destroyJob(jobId);
+    public long destroyJob(long predestributedId) throws Exception {
+        return hci.destroyJob(predestributedId);
     }
 
     @Override
-    public JobId startJob(JobId jobId) throws Exception {
-        return hci.startJob(jobId);
+    public JobId startJob(long predestributedId, Map<byte[], byte[]> 
jobParameters) throws Exception {
+        return hci.startJob(predestributedId, jobParameters);
     }
 
     public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, 
EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
-    public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf) 
throws Exception {
+    public long distributeJob(IActivityClusterGraphGeneratorFactory acggf) 
throws Exception {
         return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 0956d85..a29fe51 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -20,6 +20,7 @@
 
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
@@ -94,25 +95,27 @@
      *            Flags
      * @throws Exception
      */
-    public JobId distributeJob(JobSpecification jobSpec) throws Exception;
+    public long distributeJob(JobSpecification jobSpec) throws Exception;
 
     /**
      * Destroy the distributed graph for a pre-distributed job
      *
-     * @param jobId
+     * @param predestributedId
      *            The id of the predistributed job
      * @throws Exception
      */
-    public JobId destroyJob(JobId jobId) throws Exception;
+    public long destroyJob(long predestributedId) throws Exception;
 
     /**
      * Used to run a pre-distributed job by id (the same JobId will be 
returned)
      *
-     * @param jobId
+     * @param predestributedId
      *            The id of the predistributed job
+     * @param jobParameters
+     *            The serialized job parameters
      * @throws Exception
      */
-    public JobId startJob(JobId jobId) throws Exception;
+    public JobId startJob(long predestributedId, Map<byte[], byte[]> 
jobParameters) throws Exception;
 
     /**
      * Start the specified Job.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 1afbe9e..6b303be 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -38,13 +38,13 @@
 
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws 
Exception;
 
-    public JobId startJob(JobId jobId) throws Exception;
+    public JobId startJob(long predestributedId, Map<byte[], byte[]> 
jobParameters) throws Exception;
 
     public void cancelJob(JobId jobId) throws Exception;
 
-    public JobId distributeJob(byte[] acggfBytes) throws Exception;
+    public long distributeJob(byte[] acggfBytes) throws Exception;
 
-    public JobId destroyJob(JobId jobId) throws Exception;
+    public long destroyJob(long predestributedId) throws Exception;
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
index 7dd5fe9..0b2cc9b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
@@ -28,7 +28,6 @@
 import java.util.logging.Logger;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
@@ -36,7 +35,6 @@
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.ActivityClusterId;
 import org.apache.hyracks.api.job.JobActivityGraph;
-import org.apache.hyracks.api.job.JobId;
 
 public class ActivityClusterGraphBuilder {
     private static final Logger LOGGER = 
Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
@@ -70,7 +68,7 @@
         return null;
     }
 
-    public ActivityClusterGraph inferActivityClusters(JobId jobId, 
JobActivityGraph jag) {
+    public ActivityClusterGraph inferActivityClusters(JobActivityGraph jag) {
         /*
          * Build initial equivalence sets map. We create a map such that for 
each IOperatorTask, t -> { t }
          */
@@ -99,7 +97,7 @@
         Map<ActivityId, IActivity> activityNodeMap = jag.getActivityMap();
         List<ActivityCluster> acList = new ArrayList<ActivityCluster>();
         for (Set<ActivityId> stage : stages) {
-            ActivityCluster ac = new ActivityCluster(acg, new 
ActivityClusterId(jobId, acCounter++));
+            ActivityCluster ac = new ActivityCluster(acg, new 
ActivityClusterId(acCounter++));
             acList.add(ac);
             for (ActivityId aid : stage) {
                 IActivity activity = activityNodeMap.get(aid);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index c712b36..ddf0ce8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -32,7 +32,6 @@
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobActivityGraph;
 import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.rewriter.ActivityClusterGraphRewriter;
 
@@ -51,8 +50,8 @@
     }
 
     @Override
-    public IActivityClusterGraphGenerator 
createActivityClusterGraphGenerator(JobId jobId,
-            final ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) 
throws HyracksException {
+    public IActivityClusterGraphGenerator 
createActivityClusterGraphGenerator(final ICCServiceContext ccServiceCtx,
+            Set<JobFlag> jobFlags) throws HyracksException {
         final JobActivityGraphBuilder builder = new 
JobActivityGraphBuilder(spec, jobFlags);
         PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
             @Override
@@ -70,7 +69,7 @@
         final JobActivityGraph jag = builder.getActivityGraph();
         ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
 
-        final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, 
jag);
+        final ActivityClusterGraph acg = acgb.inferActivityClusters(jag);
         acg.setFrameSize(spec.getFrameSize());
         acg.setMaxReattempts(spec.getMaxReattempts());
         
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index df693b2..4358cbe 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
 import org.apache.hyracks.api.job.JobFlag;
@@ -51,5 +52,7 @@
 
     Object getSharedObject();
 
+    public byte[] getJobParameter(byte[] name, int start, int length) throws 
HyracksException;
+
     Set<JobFlag> getJobFlags();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
index b64e2d5..8432a6f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
@@ -24,14 +24,15 @@
 import java.util.List;
 import java.util.Map;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import 
org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class ActivityClusterGraph implements Serializable {
     private static final long serialVersionUID = 1L;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
index e0c5279..46ebf4d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
@@ -23,17 +23,10 @@
 public final class ActivityClusterId implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final JobId jobId;
-
     private final int id;
 
-    public ActivityClusterId(JobId jobId, int id) {
-        this.jobId = jobId;
+    public ActivityClusterId(int id) {
         this.id = id;
-    }
-
-    public JobId getJobId() {
-        return jobId;
     }
 
     public int getId() {
@@ -45,7 +38,6 @@
         final int prime = 31;
         int result = 1;
         result = prime * result + id;
-        result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
         return result;
     }
 
@@ -64,18 +56,11 @@
         if (id != other.id) {
             return false;
         }
-        if (jobId == null) {
-            if (other.jobId != null) {
-                return false;
-            }
-        } else if (!jobId.equals(other.jobId)) {
-            return false;
-        }
         return true;
     }
 
     @Override
     public String toString() {
-        return "ACID:" + jobId + ":" + id;
+        return "ACID:" + ":" + id;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 133e342..d23b944 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -25,7 +25,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 
 public interface IActivityClusterGraphGeneratorFactory extends Serializable {
-    public IActivityClusterGraphGenerator 
createActivityClusterGraphGenerator(JobId jobId,
+    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(
             ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws 
HyracksException;
 
     public JobSpecification getJobSpecification();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
new file mode 100644
index 0000000..048db5e
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class JobParameterByteStore implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Map<byte[], byte[]> vars;
+    private Map<String, byte[]> nonpureSingletonValues;
+    private final byte[] empty = new byte[0];
+
+    public JobParameterByteStore() {
+        vars = new HashMap<>();
+    }
+
+    public Map<byte[], byte[]> getParameterMap() {
+        return vars;
+    }
+
+    public void setParameters(Map<byte[], byte[]> map) {
+        vars = map;
+    }
+
+    public byte[] getParameterValue(byte[] name, int start, int length) {
+        for (Entry<byte[], byte[]> entry : vars.entrySet()) {
+            byte[] key = entry.getKey();
+            if (key.length == length) {
+                boolean matched = true;
+                for (int j = 0; j < length; j++) {
+                    if (key[j] != name[j + start]) {
+                        matched = false;
+                        break;
+                    }
+                }
+                if (matched) {
+                    return entry.getValue();
+                }
+            }
+        }
+        return empty;
+    }
+
+    public synchronized byte[] getNonpureSingletonValue(String functionName) {
+        byte[] value = nonpureSingletonValues.get(functionName);
+        if (value == null) {
+            nonpureSingletonValues.put(functionName, value);
+        }
+        return value;
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 327c422..2e4bbaa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -53,6 +53,7 @@
     private static final Logger LOGGER = 
Logger.getLogger(ClientInterfaceIPCI.class.getName());
     private final ClusterControllerService ccs;
     private final JobIdFactory jobIdFactory;
+    private long predistributedId = 0;
 
     ClientInterfaceIPCI(ClusterControllerService ccs, JobIdFactory 
jobIdFactory) {
         this.ccs = ccs;
@@ -85,14 +86,14 @@
             case DISTRIBUTE_JOB:
                 HyracksClientInterfaceFunctions.DistributeJobFunction djf =
                         
(HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
-                ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, 
djf.getACGGFBytes(), jobIdFactory,
-                        new IPCResponder<JobId>(handle, mid)));
+                ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, 
djf.getACGGFBytes(), predistributedId++,
+                         new IPCResponder<Long>(handle, mid)));
                 break;
             case DESTROY_JOB:
                 HyracksClientInterfaceFunctions.DestroyJobFunction dsjf =
                         (HyracksClientInterfaceFunctions.DestroyJobFunction) 
fn;
-                ccs.getWorkQueue()
-                        .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new 
IPCResponder<JobId>(handle, mid)));
+                ccs.getWorkQueue().schedule(
+                        new DestroyJobWork(ccs, dsjf.getPredistributedId(), 
new IPCResponder<Long>(handle, mid)));
                 break;
             case CANCEL_JOB:
                 HyracksClientInterfaceFunctions.CancelJobFunction cjf =
@@ -103,8 +104,14 @@
             case START_JOB:
                 HyracksClientInterfaceFunctions.StartJobFunction sjf =
                         (HyracksClientInterfaceFunctions.StartJobFunction) fn;
-                ccs.getWorkQueue().schedule(new JobStartWork(ccs, 
sjf.getDeploymentId(), sjf.getACGGFBytes(),
-                        sjf.getJobFlags(), sjf.getJobId(), new 
IPCResponder<JobId>(handle, mid), jobIdFactory));
+                long id = sjf.getpredistributedId();
+                byte[] acggfBytes = null;
+                if (id == -1) {
+                    //The job is new
+                    acggfBytes = sjf.getACGGFBytes();
+                }
+                ccs.getWorkQueue().schedule(new JobStartWork(ccs, 
sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
+                        jobIdFactory, sjf.getJobParameters(), new 
IPCResponder<JobId>(handle, mid), id));
                 break;
             case GET_DATASET_DIRECTORY_SERIVICE_INFO:
                 ccs.getWorkQueue().schedule(
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 350984c..edae49f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -102,7 +102,8 @@
             case DISTRIBUTED_JOB_FAILURE:
                 CCNCFunctions.ReportDistributedJobFailureFunction rdjf =
                         (CCNCFunctions.ReportDistributedJobFailureFunction) fn;
-                ccs.getWorkQueue().schedule(new 
DistributedJobFailureWork(rdjf.getJobId(), rdjf.getNodeId()));
+                ccs.getWorkQueue()
+                        .schedule(new 
DistributedJobFailureWork(rdjf.getPredistributedId(), rdjf.getNodeId()));
                 break;
             case REGISTER_PARTITION_PROVIDER:
                 CCNCFunctions.RegisterPartitionProviderFunction rppf =
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index dfc79ed..7a8a4ae 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -50,7 +50,9 @@
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
@@ -109,6 +111,8 @@
 
     private final PreDistributedJobStore preDistributedJobStore = new 
PreDistributedJobStore();
 
+    private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = 
new HashMap<>();
+
     private final WorkQueue workQueue;
 
     private ExecutorService executor;
@@ -164,8 +168,8 @@
         final ClusterTopology topology = computeClusterTopology(ccConfig);
         ccContext = new ClusterControllerContext(topology);
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new 
DatasetDirectoryService(ccConfig.getResultTTL(),
-                ccConfig.getResultSweepThreshold(), preDistributedJobStore);
+        datasetDirectoryService =
+                new DatasetDirectoryService(ccConfig.getResultTTL(), 
ccConfig.getResultSweepThreshold());
 
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
@@ -352,6 +356,19 @@
         return preDistributedJobStore;
     }
 
+    public void removeJobParameterByteStore(JobId jobId) throws 
HyracksException {
+        jobParameterByteStoreMap.remove(jobId);
+    }
+
+    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) 
throws HyracksException {
+        JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
+        if (jpbs == null) {
+            jpbs = new JobParameterByteStore();
+            jobParameterByteStoreMap.put(jobId, jpbs);
+        }
+        return jpbs;
+    }
+
     public IResourceManager getResourceManager() {
         return resourceManager;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
index 117621f..9e5a1a7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
@@ -26,52 +26,47 @@
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public class PreDistributedJobStore {
 
-    private final Map<JobId, PreDistributedJobDescriptor> 
preDistributedJobDescriptorMap;
+    private final Map<Long, PreDistributedJobDescriptor> 
preDistributedJobDescriptorMap;
 
     public PreDistributedJobStore() {
         preDistributedJobDescriptorMap = new Hashtable<>();
     }
 
-    public void addDistributedJobDescriptor(JobId jobId, ActivityClusterGraph 
activityClusterGraph,
+    public void addDistributedJobDescriptor(long predestributedId, 
ActivityClusterGraph activityClusterGraph,
             JobSpecification jobSpecification, Set<Constraint> 
activityClusterGraphConstraints)
                     throws HyracksException {
-        if (preDistributedJobDescriptorMap.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
jobId);
+        if (preDistributedJobDescriptorMap.get(predestributedId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
predestributedId);
         }
         PreDistributedJobDescriptor descriptor =
                 new PreDistributedJobDescriptor(activityClusterGraph, 
jobSpecification, activityClusterGraphConstraints);
-        preDistributedJobDescriptorMap.put(jobId, descriptor);
+        preDistributedJobDescriptorMap.put(predestributedId, descriptor);
     }
 
-    public void checkForExistingDistributedJobDescriptor(JobId jobId) throws 
HyracksException {
-        if (preDistributedJobDescriptorMap.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
jobId);
+    public void checkForExistingDistributedJobDescriptor(long 
predestributedId) throws HyracksException {
+        if (preDistributedJobDescriptorMap.get(predestributedId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
predestributedId);
         }
     }
 
-    public PreDistributedJobDescriptor getDistributedJobDescriptor(JobId 
jobId) throws HyracksException {
-        PreDistributedJobDescriptor descriptor = 
preDistributedJobDescriptorMap.get(jobId);
+    public PreDistributedJobDescriptor getDistributedJobDescriptor(long 
predestributedId) throws HyracksException {
+        PreDistributedJobDescriptor descriptor = 
preDistributedJobDescriptorMap.get(predestributedId);
         if (descriptor == null) {
-            throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+            throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, 
predestributedId);
         }
         return descriptor;
     }
 
-    public boolean jobIsPredistributed(JobId jobId) {
-        return preDistributedJobDescriptorMap.get(jobId) != null;
-    }
-
-    public void removeDistributedJobDescriptor(JobId jobId) throws 
HyracksException {
-        PreDistributedJobDescriptor descriptor = 
preDistributedJobDescriptorMap.get(jobId);
+    public void removeDistributedJobDescriptor(long predestributedId) throws 
HyracksException {
+        PreDistributedJobDescriptor descriptor = 
preDistributedJobDescriptorMap.get(predestributedId);
         if (descriptor == null) {
-            throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+            throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, 
predestributedId);
         }
-        preDistributedJobDescriptorMap.remove(jobId);
+        preDistributedJobDescriptorMap.remove(predestributedId);
     }
 
     public class PreDistributedJobDescriptor {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 7a9306c..a2b667c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -42,7 +42,6 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.control.cc.PreDistributedJobStore;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
 
@@ -63,14 +62,10 @@
 
     private final Map<JobId, JobResultInfo> jobResultLocations;
 
-    private final PreDistributedJobStore preDistributedJobStore;
-
-    public DatasetDirectoryService(long resultTTL, long resultSweepThreshold,
-            PreDistributedJobStore preDistributedJobStore) {
+    public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
-        this.preDistributedJobStore = preDistributedJobStore;
-        jobResultLocations = new LinkedHashMap<>();
+        jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
     }
 
     @Override
@@ -186,9 +181,6 @@
 
     @Override
     public synchronized long getResultTimestamp(JobId jobId) {
-        if (preDistributedJobStore.jobIsPredistributed(jobId)) {
-            return -1;
-        }
         return getState(jobId).getTimestamp();
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index a3078b6..d4214c4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -77,7 +77,7 @@
 
     private final PartitionConstraintSolver solver;
 
-    private final boolean predistributed;
+    private final long predistributedId;
 
     private final Map<PartitionId, TaskCluster> 
partitionProducingTaskClusterMap;
 
@@ -88,10 +88,10 @@
     private boolean cancelled = false;
 
     public JobExecutor(ClusterControllerService ccs, JobRun jobRun, 
Collection<Constraint> constraints,
-            boolean predistributed) {
+            long predistributedId) {
         this.ccs = ccs;
         this.jobRun = jobRun;
-        this.predistributed = predistributed;
+        this.predistributedId = predistributedId;
         solver = new PartitionConstraintSolver();
         partitionProducingTaskClusterMap = new HashMap<>();
         inProgressTaskClusters = new HashSet<>();
@@ -100,7 +100,7 @@
     }
 
     public boolean isPredistributed() {
-        return predistributed;
+        return predistributedId != -1;
     }
 
     public JobRun getJobRun() {
@@ -503,7 +503,7 @@
                 new HashMap<>(jobRun.getConnectorPolicyMap());
         INodeManager nodeManager = ccs.getNodeManager();
         try {
-            byte[] acgBytes = predistributed ? null : 
JavaSerializationUtils.serialize(acg);
+            byte[] acgBytes = isPredistributed() ? null : 
JavaSerializationUtils.serialize(acg);
             for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : 
taskAttemptMap.entrySet()) {
                 String nodeId = entry.getKey();
                 final List<TaskAttemptDescriptor> taskDescriptors = 
entry.getValue();
@@ -516,7 +516,8 @@
                     }
                     byte[] jagBytes = changed ? acgBytes : null;
                     node.getNodeController().startTasks(deploymentId, jobId, 
jagBytes, taskDescriptors,
-                            connectorPolicies, jobRun.getFlags());
+                            connectorPolicies, jobRun.getFlags(),
+                            
ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(), 
predistributedId);
                 }
             }
         } catch (Exception e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index c6d90a7..66f4a96 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -142,6 +142,7 @@
 
     @Override
     public void prepareComplete(JobRun run, JobStatus status, List<Exception> 
exceptions) throws HyracksException {
+        ccs.removeJobParameterByteStore(run.getJobId());
         checkJob(run);
         if (status == JobStatus.FAILURE_BEFORE_EXECUTION) {
             run.setPendingStatus(JobStatus.FAILURE, exceptions);
@@ -301,9 +302,7 @@
 
         CCServiceContext serviceCtx = ccs.getContext();
         JobSpecification spec = run.getJobSpecification();
-        if (!run.getExecutor().isPredistributed()) {
-            serviceCtx.notifyJobCreation(jobId, spec);
-        }
+        serviceCtx.notifyJobCreation(jobId, spec);
         run.setStatus(JobStatus.RUNNING, null);
         executeJobInternal(run);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 95a6d9b..641a636 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -116,19 +116,21 @@
 
     //Run a Pre-distributed job by passing the JobId
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, 
JobId jobId, Set<JobFlag> jobFlags,
-            PreDistributedJobDescriptor distributedJobDescriptor)
+            PreDistributedJobDescriptor distributedJobDescriptor, Map<byte[], 
byte[]> jobParameters,
+            long predestributedId)
             throws HyracksException {
         this(deploymentId, jobId, jobFlags,
                 distributedJobDescriptor.getJobSpecification(), 
distributedJobDescriptor.getActivityClusterGraph());
+        
ccs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters);
         Set<Constraint> constaints = 
distributedJobDescriptor.getActivityClusterGraphConstraints();
-        this.scheduler = new JobExecutor(ccs, this, constaints, true);
+        this.scheduler = new JobExecutor(ccs, this, constaints, 
predestributedId);
     }
 
     //Run a new job by creating an ActivityClusterGraph
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, 
JobId jobId,
             IActivityClusterGraphGeneratorFactory acggf, 
IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags) {
         this(deploymentId, jobId, jobFlags, acggf.getJobSpecification(), 
acgg.initialize());
-        this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), 
false);
+        this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), -1);
     }
 
     public DeploymentId getDeploymentId() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
index df98252..66cfa7f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
@@ -27,11 +26,11 @@
 
 public class DestroyJobWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
-    private final JobId jobId;
-    private final IResultCallback<JobId> callback;
+    private final long predestributedId;
+    private final IResultCallback<Long> callback;
 
-    public DestroyJobWork(ClusterControllerService ccs, JobId jobId, 
IResultCallback<JobId> callback) {
-        this.jobId = jobId;
+    public DestroyJobWork(ClusterControllerService ccs, long predestributedId, 
IResultCallback<Long> callback) {
+        this.predestributedId = predestributedId;
         this.ccs = ccs;
         this.callback = callback;
     }
@@ -39,12 +38,12 @@
     @Override
     protected void doRun() throws Exception {
         try {
-            
ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(jobId);
+            
ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(predestributedId);
             INodeManager nodeManager = ccs.getNodeManager();
             for (NodeControllerState node : 
nodeManager.getAllNodeControllerStates()) {
-                node.getNodeController().destroyJob(jobId);
+                node.getNodeController().destroyJob(predestributedId);
             }
-            callback.setValue(jobId);
+            callback.setValue(predestributedId);
         } catch (Exception e) {
             callback.setException(e);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
index 5a57b1b..e9985bf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
@@ -24,8 +24,6 @@
 import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
@@ -38,12 +36,12 @@
 public class DistributeJobWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final byte[] acggfBytes;
-    private final JobIdFactory jobIdFactory;
-    private final IResultCallback<JobId> callback;
+    private final long predestributedId;
+    private final IResultCallback<Long> callback;
 
-    public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, 
JobIdFactory jobIdFactory,
-            IResultCallback<JobId> callback) {
-        this.jobIdFactory = jobIdFactory;
+    public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, 
long predestributedId,
+            IResultCallback<Long> callback) {
+        this.predestributedId = predestributedId;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
         this.callback = callback;
@@ -52,27 +50,25 @@
     @Override
     protected void doRun() throws Exception {
         try {
-            JobId jobId = jobIdFactory.create();
             final CCServiceContext ccServiceCtx = ccs.getContext();
-            
ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId);
+            
ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(predestributedId);
             IActivityClusterGraphGeneratorFactory acggf =
                     (IActivityClusterGraphGeneratorFactory) 
DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx);
             IActivityClusterGraphGenerator acgg =
-                    acggf.createActivityClusterGraphGenerator(jobId, 
ccServiceCtx, EnumSet.noneOf(JobFlag.class));
+                    acggf.createActivityClusterGraphGenerator(ccServiceCtx, 
EnumSet.noneOf(JobFlag.class));
             ActivityClusterGraph acg = acgg.initialize();
-            ccs.getPreDistributedJobStore().addDistributedJobDescriptor(jobId, 
acg, acggf.getJobSpecification(),
+            
ccs.getPreDistributedJobStore().addDistributedJobDescriptor(predestributedId, 
acg,
+                    acggf.getJobSpecification(),
                     acgg.getConstraints());
-
-            ccServiceCtx.notifyJobCreation(jobId, acggf.getJobSpecification());
 
             byte[] acgBytes = JavaSerializationUtils.serialize(acg);
 
             INodeManager nodeManager = ccs.getNodeManager();
             for (NodeControllerState node : 
nodeManager.getAllNodeControllerStates()) {
-                node.getNodeController().distributeJob(jobId, acgBytes);
+                node.getNodeController().distributeJob(predestributedId, 
acgBytes);
             }
 
-            callback.setValue(jobId);
+            callback.setValue(predestributedId);
         } catch (Exception e) {
             callback.setException(e);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
index f7fa2a4..db4b35a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
@@ -20,20 +20,19 @@
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public class DistributedJobFailureWork extends SynchronizableWork {
-    protected final JobId jobId;
+    protected final long predistributedId;
     protected final String nodeId;
 
-    public DistributedJobFailureWork(JobId jobId, String nodeId) {
-        this.jobId = jobId;
+    public DistributedJobFailureWork(long predistributedId, String nodeId) {
+        this.predistributedId = predistributedId;
         this.nodeId = nodeId;
     }
 
     @Override
     public void doRun() throws HyracksException {
-        throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE, 
jobId, nodeId);
+        throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE, 
predistributedId, nodeId);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index ed82705..d5a2dde 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -18,7 +18,8 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
@@ -37,20 +38,24 @@
 public class JobStartWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final byte[] acggfBytes;
-    private final EnumSet<JobFlag> jobFlags;
+    private final Set<JobFlag> jobFlags;
     private final DeploymentId deploymentId;
-    private final JobId preDistributedJobId;
     private final IResultCallback<JobId> callback;
     private final JobIdFactory jobIdFactory;
+    private final long predestributedId;
+    private final Map<byte[], byte[]> jobParameters;
 
     public JobStartWork(ClusterControllerService ccs, DeploymentId 
deploymentId, byte[] acggfBytes,
-            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> 
callback, JobIdFactory jobIdFactory) {
+            Set<JobFlag> jobFlags, JobIdFactory jobIdFactory, Map<byte[], 
byte[]> jobParameters,
+ IResultCallback<JobId> callback,
+            long predestributedId) {
         this.deploymentId = deploymentId;
-        this.preDistributedJobId = jobId;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
         this.jobFlags = jobFlags;
         this.callback = callback;
+        this.predestributedId = predestributedId;
+        this.jobParameters = jobParameters;
         this.jobIdFactory = jobIdFactory;
     }
 
@@ -61,19 +66,18 @@
             final CCServiceContext ccServiceCtx = ccs.getContext();
             JobId jobId;
             JobRun run;
-            if (preDistributedJobId == null) {
-                jobId = jobIdFactory.create();
+            jobId = jobIdFactory.create();
+            if (predestributedId == -1) {
                 //Need to create the ActivityClusterGraph
                 IActivityClusterGraphGeneratorFactory acggf = 
(IActivityClusterGraphGeneratorFactory) DeploymentUtils
                         .deserialize(acggfBytes, deploymentId, ccServiceCtx);
-                IActivityClusterGraphGenerator acgg =
-                        acggf.createActivityClusterGraphGenerator(jobId, 
ccServiceCtx, jobFlags);
+                IActivityClusterGraphGenerator acgg = 
acggf.createActivityClusterGraphGenerator(ccServiceCtx, jobFlags);
                 run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, 
jobFlags);
             } else {
-                jobId = preDistributedJobId;
                 //ActivityClusterGraph has already been distributed
                 run = new JobRun(ccs, deploymentId, jobId, jobFlags,
-                        
ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
+                        
ccs.getPreDistributedJobStore().getDistributedJobDescriptor(predestributedId), 
jobParameters,
+                        predestributedId);
             }
             jobManager.add(run);
             callback.setValue(jobId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index ec8e045..813836a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -44,7 +44,7 @@
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String 
nodeId, List<Exception> exceptions)
             throws Exception;
 
-    public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws 
Exception;
+    public void notifyDistributedJobFailure(long predestributedId, String 
nodeId) throws Exception;
 
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws 
Exception;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index a10f8f0..a3d7719 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.control.common.base;
 
 import java.net.URL;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,7 +37,7 @@
 public interface INodeController {
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] 
planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, 
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags) throws Exception;
+            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, long 
predistributedId) throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws 
Exception;
 
@@ -50,9 +49,9 @@
 
     public void undeployBinary(DeploymentId deploymentId) throws Exception;
 
-    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception;
+    public void distributeJob(long predistributedId, byte[] planBytes) throws 
Exception;
 
-    public void destroyJob(JobId jobId) throws Exception;
+    public void destroyJob(long predestributedId) throws Exception;
 
     public void dumpState(String stateDumpId) throws Exception;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index d42c4a8..7e40fbf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -288,11 +288,11 @@
     public static class ReportDistributedJobFailureFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final long predestributedId;
         private final String nodeId;
 
-        public ReportDistributedJobFailureFunction(JobId jobId, String nodeId) 
{
-            this.jobId = jobId;
+        public ReportDistributedJobFailureFunction(long predestributedId, 
String nodeId) {
+            this.predestributedId = predestributedId;
             this.nodeId = nodeId;
         }
 
@@ -301,8 +301,8 @@
             return FunctionId.DISTRIBUTED_JOB_FAILURE;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public long getPredistributedId() {
+            return predestributedId;
         }
 
         public String getNodeId() {
@@ -668,12 +668,12 @@
     public static class DistributeJobFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final long predistributedId;
 
         private final byte[] acgBytes;
 
-        public DistributeJobFunction(JobId jobId, byte[] acgBytes) {
-            this.jobId = jobId;
+        public DistributeJobFunction(long predistributedId, byte[] acgBytes) {
+            this.predistributedId = predistributedId;
             this.acgBytes = acgBytes;
         }
 
@@ -682,8 +682,8 @@
             return FunctionId.DISTRIBUTE_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public long getPredistributedId() {
+            return predistributedId;
         }
 
         public byte[] getacgBytes() {
@@ -694,10 +694,10 @@
     public static class DestroyJobFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final long predistributedId;
 
-        public DestroyJobFunction(JobId jobId) {
-            this.jobId = jobId;
+        public DestroyJobFunction(long predistributedId) {
+            this.predistributedId = predistributedId;
         }
 
         @Override
@@ -705,8 +705,8 @@
             return FunctionId.DESTROY_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public long getPredistributedId() {
+            return predistributedId;
         }
     }
 
@@ -719,16 +719,21 @@
         private final List<TaskAttemptDescriptor> taskDescriptors;
         private final Map<ConnectorDescriptorId, IConnectorPolicy> 
connectorPolicies;
         private final Set<JobFlag> flags;
+        private final Map<byte[], byte[]> jobParameters;
+        private final long predistributedId;
 
         public StartTasksFunction(DeploymentId deploymentId, JobId jobId, 
byte[] planBytes,
                 List<TaskAttemptDescriptor> taskDescriptors,
-                Map<ConnectorDescriptorId, IConnectorPolicy> 
connectorPolicies, Set<JobFlag> flags) {
+                Map<ConnectorDescriptorId, IConnectorPolicy> 
connectorPolicies, Set<JobFlag> flags,
+                Map<byte[], byte[]> jobParameters, long predistributedId) {
             this.deploymentId = deploymentId;
             this.jobId = jobId;
             this.planBytes = planBytes;
             this.taskDescriptors = taskDescriptors;
             this.connectorPolicies = connectorPolicies;
             this.flags = flags;
+            this.jobParameters = jobParameters;
+            this.predistributedId = predistributedId;
         }
 
         @Override
@@ -740,8 +745,16 @@
             return deploymentId;
         }
 
+        public long getPredistributedId() {
+            return predistributedId;
+        }
+
         public JobId getJobId() {
             return jobId;
+        }
+
+        public Map<byte[], byte[]> getJobParameters() {
+            return jobParameters;
         }
 
         public byte[] getPlanBytes() {
@@ -804,7 +817,29 @@
                 flags.add(JobFlag.values()[(dis.readInt())]);
             }
 
-            return new StartTasksFunction(deploymentId, jobId, planBytes, 
taskDescriptors, connectorPolicies, flags);
+            // read job parameters
+            int runTimeVarsSize = dis.readInt();
+            Map<byte[], byte[]> jobParameters = new HashMap<>();
+            for (int i = 0; i < runTimeVarsSize; i++) {
+                int nameLength = dis.readInt();
+                byte[] nameBytes = null;
+                if (nameLength >= 0) {
+                    nameBytes = new byte[nameLength];
+                    dis.read(nameBytes, 0, nameLength);
+                }
+                int valueLength = dis.readInt();
+                byte[] valueBytes = null;
+                if (valueLength >= 0) {
+                    valueBytes = new byte[valueLength];
+                    dis.read(valueBytes, 0, valueLength);
+                }
+                jobParameters.put(nameBytes, valueBytes);
+            }
+
+            long predistributedId = dis.readLong();
+
+            return new StartTasksFunction(deploymentId, jobId, planBytes, 
taskDescriptors, connectorPolicies, flags,
+                    jobParameters, predistributedId);
         }
 
         public static void serialize(OutputStream out, Object object) throws 
Exception {
@@ -842,6 +877,19 @@
             for (JobFlag flag : fn.flags) {
                 dos.writeInt(flag.ordinal());
             }
+
+            //write job parameters
+            dos.writeInt(fn.jobParameters.size());
+            for (Entry<byte[], byte[]> entry : fn.jobParameters.entrySet()) {
+                dos.writeInt(entry.getKey().length);
+                dos.write(entry.getKey(), 0, entry.getKey().length);
+                dos.writeInt(entry.getValue().length);
+                dos.write(entry.getValue(), 0, entry.getValue().length);
+            }
+
+            //write predistributed id
+            dos.writeLong(fn.predistributedId);
+
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 4707487..f0443b5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
-import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
-
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.logging.Logger;
@@ -33,6 +31,24 @@
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.deployment.DeploymentStatus;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.GetNodeControllersInfoFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NodeHeartbeatFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyDeployBinaryFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyJobletCleanupFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyTaskCompleteFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyTaskFailureFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterNodeFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionProviderFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionRequestFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterResultPartitionLocationFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportDistributedJobFailureFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportProfileFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionWriteCompletionFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpResponseFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.UnregisterNodeFunction;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
@@ -150,9 +166,8 @@
     }
 
     @Override
-    public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws 
Exception {
-        ReportDistributedJobFailureFunction fn = new 
ReportDistributedJobFailureFunction(
-                jobId, nodeId);
+    public void notifyDistributedJobFailure(long predestributedId, String 
nodeId) throws Exception {
+        ReportDistributedJobFailureFunction fn = new 
ReportDistributedJobFailureFunction(predestributedId, nodeId);
         ensureIpcHandle().send(-1, fn, null);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 68a5b76..e703acc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
-import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
-
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;
@@ -37,6 +35,18 @@
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.base.INodeController;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortTasksFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.CleanupJobletFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.DeployBinaryFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.DestroyJobFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.DistributeJobFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportPartitionAvailabilityFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownRequestFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StartTasksFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpRequestFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.UnDeployBinaryFunction;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 
@@ -60,9 +70,9 @@
     @Override
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] 
planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, 
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags) throws Exception {
+            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, long 
predistributedId) throws Exception {
         StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, 
planBytes,
-                taskDescriptors, connectorPolicies, flags);
+                taskDescriptors, connectorPolicies, flags, jobParameters, 
predistributedId);
         ensureIpcHandle().send(-1, stf, null);
     }
 
@@ -98,14 +108,14 @@
     }
 
     @Override
-    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
-        DistributeJobFunction fn = new DistributeJobFunction(jobId, planBytes);
+    public void distributeJob(long predistributedId, byte[] planBytes) throws 
Exception {
+        DistributeJobFunction fn = new DistributeJobFunction(predistributedId, 
planBytes);
         ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
-    public void destroyJob(JobId jobId) throws Exception {
-        DestroyJobFunction fn = new DestroyJobFunction(jobId);
+    public void destroyJob(long predistributedId) throws Exception {
+        DestroyJobFunction fn = new DestroyJobFunction(predistributedId);
         ensureIpcHandle().send(-1, fn, null);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index c416942..4ab14f4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -62,8 +62,10 @@
                 return;
             case START_TASKS:
                 CCNCFunctions.StartTasksFunction stf = 
(CCNCFunctions.StartTasksFunction) fn;
-                ncs.getWorkQueue().schedule(new StartTasksWork(ncs, 
stf.getDeploymentId(), stf.getJobId(),
-                        stf.getPlanBytes(), stf.getTaskDescriptors(), 
stf.getConnectorPolicies(), stf.getFlags()));
+                ncs.getWorkQueue()
+                        .schedule(new StartTasksWork(ncs, 
stf.getDeploymentId(), stf.getJobId(), stf.getPlanBytes(),
+                                stf.getTaskDescriptors(), 
stf.getConnectorPolicies(), stf.getFlags(),
+                                stf.getJobParameters(), 
stf.getPredistributedId()));
                 return;
             case ABORT_TASKS:
                 CCNCFunctions.AbortTasksFunction atf = 
(CCNCFunctions.AbortTasksFunction) fn;
@@ -103,12 +105,12 @@
 
             case DISTRIBUTE_JOB:
                 CCNCFunctions.DistributeJobFunction djf = 
(CCNCFunctions.DistributeJobFunction) fn;
-                ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, 
djf.getJobId(), djf.getacgBytes()));
+                ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, 
djf.getPredistributedId(), djf.getacgBytes()));
                 return;
 
             case DESTROY_JOB:
                 CCNCFunctions.DestroyJobFunction dsjf = 
(CCNCFunctions.DestroyJobFunction) fn;
-                ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, 
dsjf.getJobId()));
+                ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, 
dsjf.getPredistributedId()));
                 return;
 
             case STATE_DUMP_REQUEST:
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index a426d47..eab40d6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -30,6 +30,7 @@
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -54,6 +55,7 @@
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.service.IControllerService;
@@ -125,7 +127,9 @@
 
     private final Map<JobId, Joblet> jobletMap;
 
-    private final Map<JobId, ActivityClusterGraph> 
preDistributedJobActivityClusterGraphMap;
+    private final Map<Long, ActivityClusterGraph> 
preDistributedJobActivityClusterGraphMap;
+
+    private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = 
new HashMap<>();
 
     private ExecutorService executor;
 
@@ -417,28 +421,42 @@
         return jobletMap;
     }
 
-    public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph 
acg) throws HyracksException {
-        if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
jobId);
-        }
-        preDistributedJobActivityClusterGraphMap.put(jobId, acg);
+    public void removeJobParameterByteStore(JobId jobId) {
+        jobParameterByteStoreMap.remove(jobId);
     }
 
-    public void removeActivityClusterGraph(JobId jobId) throws 
HyracksException {
-        if (preDistributedJobActivityClusterGraphMap.get(jobId) == null) {
-            throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) 
throws HyracksException {
+        JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
+        if (jpbs == null){
+            jpbs = new JobParameterByteStore();
+            jobParameterByteStoreMap.put(jobId, jpbs);
         }
-        preDistributedJobActivityClusterGraphMap.remove(jobId);
+        return jpbs;
     }
 
-    public void checkForDuplicateDistributedJob(JobId jobId) throws 
HyracksException {
-        if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
jobId);
+
+    public void storeActivityClusterGraph(long predestributedId, 
ActivityClusterGraph acg) throws HyracksException {
+        if (preDistributedJobActivityClusterGraphMap.get(predestributedId) != 
null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
predestributedId);
+        }
+        preDistributedJobActivityClusterGraphMap.put(predestributedId, acg);
+    }
+
+    public void removeActivityClusterGraph(long predestributedId) throws 
HyracksException {
+        if (preDistributedJobActivityClusterGraphMap.get(predestributedId) == 
null) {
+            throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, 
predestributedId);
+        }
+        preDistributedJobActivityClusterGraphMap.remove(predestributedId);
+    }
+
+    public void checkForDuplicateDistributedJob(long predestributedId) throws 
HyracksException {
+        if (preDistributedJobActivityClusterGraphMap.get(predestributedId) != 
null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
predestributedId);
         }
     }
 
-    public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws 
HyracksException {
-        return preDistributedJobActivityClusterGraphMap.get(jobId);
+    public ActivityClusterGraph getActivityClusterGraph(long predistributedId) 
throws HyracksException {
+        return preDistributedJobActivityClusterGraphMap.get(predistributedId);
     }
 
     public NetworkManager getNetworkManager() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index bff2794..66b0b33 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -450,6 +450,9 @@
     }
 
     @Override
+    public byte[] getJobParameter(byte[] name, int start, int length) throws 
HyracksException {
+        return 
ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name, 
start, length);
+    }
     public Set<JobFlag> getJobFlags() {
         return jobFlags;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 670ce06..03ae90c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -51,6 +51,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
+        ncs.removeJobParameterByteStore(jobId);
         final List<IPartition> unregisteredPartitions = new 
ArrayList<IPartition>();
         ncs.getPartitionManager().unregisterPartitions(jobId, 
unregisteredPartitions);
         ncs.getExecutor().execute(new Runnable() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
index 55dd01e..77cc37e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
@@ -20,7 +20,6 @@
 package org.apache.hyracks.control.nc.work;
 
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
@@ -31,20 +30,20 @@
 public class DestroyJobWork extends AbstractWork {
 
     private final NodeControllerService ncs;
-    private final JobId jobId;
+    private final long predistributedId;
 
-    public DestroyJobWork(NodeControllerService ncs, JobId jobId) {
+    public DestroyJobWork(NodeControllerService ncs, long predistributedId) {
         this.ncs = ncs;
-        this.jobId = jobId;
+        this.predistributedId = predistributedId;
     }
 
     @Override
     public void run() {
         try {
-            ncs.removeActivityClusterGraph(jobId);
+            ncs.removeActivityClusterGraph(predistributedId);
         } catch (HyracksException e) {
             try {
-                ncs.getClusterController().notifyDistributedJobFailure(jobId, 
ncs.getId());
+                
ncs.getClusterController().notifyDistributedJobFailure(predistributedId, 
ncs.getId());
             } catch (Exception e1) {
                 e1.printStackTrace();
             }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
index 486a420..e9b7f1b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
@@ -21,7 +21,6 @@
 
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -34,25 +33,24 @@
 
     private final NodeControllerService ncs;
     private final byte[] acgBytes;
-    private final JobId jobId;
+    private final long predestributedId;
 
-    public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[] 
acgBytes) {
+    public DistributeJobWork(NodeControllerService ncs, long predestributedId, 
byte[] acgBytes) {
         this.ncs = ncs;
-        this.jobId = jobId;
+        this.predestributedId = predestributedId;
         this.acgBytes = acgBytes;
     }
 
     @Override
     public void run() {
         try {
-            ncs.checkForDuplicateDistributedJob(jobId);
-            ncs.updateMaxJobId(jobId);
+            ncs.checkForDuplicateDistributedJob(predestributedId);
             ActivityClusterGraph acg =
                     (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, null, ncs.getContext());
-            ncs.storeActivityClusterGraph(jobId, acg);
+            ncs.storeActivityClusterGraph(predestributedId, acg);
         } catch (HyracksException e) {
             try {
-                ncs.getClusterController().notifyDistributedJobFailure(jobId, 
ncs.getId());
+                
ncs.getClusterController().notifyDistributedJobFailure(predestributedId, 
ncs.getId());
             } catch (Exception e1) {
                 e1.printStackTrace();
             }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index c369781..76a146d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -79,6 +79,8 @@
 
     private final JobId jobId;
 
+    private final long predistributedId;
+
     private final byte[] acgBytes;
 
     private final List<TaskAttemptDescriptor> taskDescriptors;
@@ -87,16 +89,21 @@
 
     private final Set<JobFlag> flags;
 
+    private final Map<byte[], byte[]> jobParameters;
+
     public StartTasksWork(NodeControllerService ncs, DeploymentId 
deploymentId, JobId jobId, byte[] acgBytes,
             List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, 
Set<JobFlag> flags) {
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, 
Set<JobFlag> flags,
+            Map<byte[], byte[]> jobParameters, long predistributedId) {
         this.ncs = ncs;
         this.deploymentId = deploymentId;
         this.jobId = jobId;
+        this.predistributedId = predistributedId;
         this.acgBytes = acgBytes;
         this.taskDescriptors = taskDescriptors;
         this.connectorPoliciesMap = connectorPoliciesMap;
         this.flags = flags;
+        this.jobParameters = jobParameters;
     }
 
     @Override
@@ -106,7 +113,7 @@
         try {
             ncs.updateMaxJobId(jobId);
             NCServiceContext serviceCtx = ncs.getContext();
-            Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, 
serviceCtx, acgBytes);
+            Joblet joblet = getOrCreateLocalJoblet(deploymentId, 
predistributedId, serviceCtx, acgBytes);
             final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
                 @Override
@@ -190,18 +197,19 @@
         }
     }
 
-    private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId 
jobId, INCServiceContext appCtx,
+    private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, long 
destributedId, INCServiceContext appCtx,
             byte[] acgBytes) throws HyracksException {
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
-            ActivityClusterGraph acg = ncs.getActivityClusterGraph(jobId);
+            ActivityClusterGraph acg = 
ncs.getActivityClusterGraph(destributedId);
             if (acg == null) {
                 if (acgBytes == null) {
                     throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
                 }
                 acg = (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
             }
+            
ncs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters);
             ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
             jobletMap.put(jobId, ji);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index 4a01fdb..d8ce841 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -23,6 +23,7 @@
 import static org.mockito.Mockito.verify;
 
 import java.io.File;
+import java.util.HashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -39,6 +40,7 @@
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Matchers;
 import org.mockito.Mockito;
 
 public class PredistributedJobsTest {
@@ -108,62 +110,77 @@
         JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec();
 
         //distribute both jobs
-        JobId jobId1 = hcc.distributeJob(spec1);
-        JobId jobId2 = hcc.distributeJob(spec2);
+        long distributedId1 = hcc.distributeJob(spec1);
+        long distributedId2 = hcc.distributeJob(spec2);
 
         //make sure it finished
         //cc will get the store once to check for duplicate insertion and once 
to insert per job
         verify(cc, Mockito.timeout(5000).times(4)).getPreDistributedJobStore();
-        verify(nc1, 
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
-        verify(nc2, 
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
-        verify(nc1, 
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
-        verify(nc2, 
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
+        verify(nc1, 
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(Matchers.anyLong(), 
any());
+        verify(nc2, 
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(Matchers.anyLong(), 
any());
+        verify(nc1, 
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(Matchers.anyLong());
+        verify(nc2, 
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(Matchers.anyLong());
 
         //confirm that both jobs are distributed
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) != null && 
nc2.getActivityClusterGraph(jobId1) != null);
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) != null && 
nc2.getActivityClusterGraph(jobId2) != null);
-        
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId1)
 != null);
-        
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2)
 != null);
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId1) != null 
&& nc2.getActivityClusterGraph(distributedId1) != null);
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId2) != null 
&& nc2.getActivityClusterGraph(distributedId2) != null);
+        
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(distributedId1)
 != null);
+        
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(distributedId2)
 != null);
 
         //run the first job
-        hcc.startJob(jobId1);
-        hcc.waitForCompletion(jobId1);
+        JobId jobRunId1 = hcc.startJob(distributedId1, new HashMap<>());
+        hcc.waitForCompletion(jobRunId1);
+
+        //Make sure the job parameter map was removed
+        verify(cc, 
Mockito.timeout(5000).times(1)).removeJobParameterByteStore(any());
+        verify(nc1, 
Mockito.timeout(5000).times(1)).removeJobParameterByteStore(any());
+        verify(nc2, 
Mockito.timeout(5000).times(1)).removeJobParameterByteStore(any());
 
         //destroy the first job
-        hcc.destroyJob(jobId1);
+        hcc.destroyJob(distributedId1);
 
         //make sure it finished
         verify(cc, Mockito.timeout(5000).times(8)).getPreDistributedJobStore();
-        verify(nc1, 
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
-        verify(nc2, 
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
+        verify(nc1, 
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(Matchers.anyLong());
+        verify(nc2, 
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(Matchers.anyLong());
 
         //confirm the first job is destroyed
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) == null && 
nc2.getActivityClusterGraph(jobId1) == null);
-        
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1);
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId1) == null 
&& nc2.getActivityClusterGraph(distributedId1) == null);
+        
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(distributedId1);
 
         //run the second job
-        hcc.startJob(jobId2);
-        hcc.waitForCompletion(jobId2);
+        JobId jobRunId2 = hcc.startJob(distributedId2, new HashMap<>());
+        hcc.waitForCompletion(jobRunId2);
+
+        //Make sure the job parameter map was removed
+        verify(cc, 
Mockito.timeout(5000).times(2)).removeJobParameterByteStore(any());
+        verify(nc1, 
Mockito.timeout(5000).times(2)).removeJobParameterByteStore(any());
+        verify(nc2, 
Mockito.timeout(5000).times(2)).removeJobParameterByteStore(any());
 
         //wait ten seconds to ensure the result sweeper does not break the job
         //The result sweeper runs every 5 seconds during the tests
         Thread.sleep(10000);
 
         //run the second job again
-        hcc.startJob(jobId2);
-        hcc.waitForCompletion(jobId2);
+        JobId jobRunId3 = hcc.startJob(distributedId2, new HashMap<>());
+        hcc.waitForCompletion(jobRunId3);
+
+        //Make sure the job parameter map was removed
+        verify(cc, 
Mockito.timeout(5000).times(3)).removeJobParameterByteStore(any());
+        verify(nc1, 
Mockito.timeout(5000).times(3)).removeJobParameterByteStore(any());
+        verify(nc2, 
Mockito.timeout(5000).times(3)).removeJobParameterByteStore(any());
 
         //destroy the second job
-        hcc.destroyJob(jobId2);
+        hcc.destroyJob(distributedId2);
 
         //make sure it finished
         verify(cc, 
Mockito.timeout(5000).times(12)).getPreDistributedJobStore();
-        verify(nc1, 
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
-        verify(nc2, 
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
+        verify(nc1, 
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(Matchers.anyLong());
+        verify(nc2, 
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(Matchers.anyLong());
 
         //confirm the second job is destroyed
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) == null && 
nc2.getActivityClusterGraph(jobId2) == null);
-        
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId2);
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId2) == null 
&& nc2.getActivityClusterGraph(distributedId2) == null);
+        
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(distributedId2);
     }
 
     @AfterClass
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index b100300..e9c9cff 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -160,6 +160,10 @@
     }
 
     @Override
+    public byte[] getJobParameter(byte[] name, int start, int length) {
+        return new byte[0];
+    }
+
     public Set<JobFlag> getJobFlags() {
         return EnumSet.noneOf(JobFlag.class);
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2045
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>

Reply via email to