Steven Jacobs has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2045
Change subject: Long Running Jobs Changes
......................................................................
Long Running Jobs Changes
Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
A
asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/not-is-missing-constant.sqlpp
A
asterixdb/asterix-app/src/test/resources/optimizerts/results/not-is-missing-constant.plan
A
asterixdb/asterix-app/src/test/resources/optimizerts/results_parser_sqlpp/not-is-missing-constant.ast
M
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
A
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
M
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
49 files changed, 698 insertions(+), 270 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/45/2045/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index d94a045..f9245a9 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -218,6 +218,7 @@
condPushDownAndJoinInference.add(new RemoveRedundantVariablesRule());
condPushDownAndJoinInference.add(new AsterixInlineVariablesRule());
+ condPushDownAndJoinInference.add(new RemoveRedundantSelectRule());
condPushDownAndJoinInference.add(new
RemoveUnusedAssignAndAggregateRule());
condPushDownAndJoinInference.add(new
FactorRedundantGroupAndDecorVarsRule());
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
index fe9e49e..a27b143 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
@@ -19,15 +19,17 @@
package org.apache.asterix.optimizer.rules;
-import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
@@ -57,7 +59,7 @@
}
SelectOperator select = (SelectOperator) op;
ILogicalExpression cond = select.getCondition().getValue();
- if (alwaysHold(cond)) {
+ if (alwaysHold(cond, context)) {
opRef.setValue(select.getInputs().get(0).getValue());
return true;
}
@@ -70,13 +72,31 @@
* @param cond
* @return true if the condition always holds; false otherwise.
*/
- private boolean alwaysHold(ILogicalExpression cond) {
+ private boolean alwaysHold(ILogicalExpression cond, IOptimizationContext
context) {
if (cond.equals(ConstantExpression.TRUE)) {
return true;
}
if (cond.equals(new ConstantExpression(new
AsterixConstantValue(ABoolean.TRUE)))) {
return true;
}
+ /* is-missing is always false for constant values
+ * so not(is-missing(CONST)) is always true
+ * this situation arises when a left outer join with an "is-missing"
filtering function call
+ * is converted to an inner join. See PushSelectIntoJoinRule
+ */
+ if (cond.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL
+ && ((AbstractFunctionCallExpression)
cond).getFunctionIdentifier() == BuiltinFunctions.NOT) {
+ ILogicalExpression subExpr = ((AbstractFunctionCallExpression)
cond).getArguments().get(0).getValue();
+ if (subExpr.getExpressionTag() ==
LogicalExpressionTag.FUNCTION_CALL
+ && ((AbstractFunctionCallExpression) subExpr)
+ .getFunctionIdentifier() ==
BuiltinFunctions.IS_MISSING) {
+ subExpr = ((AbstractFunctionCallExpression)
subExpr).getArguments().get(0).getValue();
+ if (subExpr.getExpressionTag() ==
LogicalExpressionTag.CONSTANT) {
+ return true;
+ }
+ }
+
+ }
return false;
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 29135d3..bba42bb 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -731,7 +731,7 @@
return DatasetUtil.createNodeGroupForNewDataset(dataverseName,
datasetName, selectedNodes, metadataProvider);
}
- protected void handleCreateIndexStatement(MetadataProvider
metadataProvider, Statement stmt,
+ public void handleCreateIndexStatement(MetadataProvider metadataProvider,
Statement stmt,
IHyracksClientConnection hcc) throws Exception {
CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
String dataverseName =
getActiveDataverse(stmtCreateIndex.getDataverseName());
@@ -1623,7 +1623,7 @@
CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
String dataverse =
getActiveDataverseName(cfs.getSignature().getNamespace());
cfs.getSignature().setNamespace(dataverse);
- String functionName = cfs.getaAterixFunction().getName();
+ String functionName = cfs.getSignature().getName();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1634,7 +1634,9 @@
if (dv == null) {
throw new AlgebricksException("There is no dataverse with this
name " + dataverse + ".");
}
- Function function = new Function(dataverse, functionName,
cfs.getaAterixFunction().getArity(),
+
+ // If the function body contains function calls, theirs reference
count won't be increased.
+ Function function = new Function(dataverse, functionName,
cfs.getSignature().getArity(),
cfs.getParamList(), Function.RETURNTYPE_VOID,
cfs.getFunctionBody(), Function.LANGUAGE_AQL,
FunctionKind.SCALAR.toString());
MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/not-is-missing-constant.sqlpp
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/not-is-missing-constant.sqlpp
new file mode 100644
index 0000000..2bdf1d9
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries_sqlpp/not-is-missing-constant.sqlpp
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ //This test ensure that not(is-missing(CONST)) is handled properly
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type userLocation as {
+ userId: int,
+ roomNumber: int
+};
+
+create type result as {
+ id:uuid
+};
+
+create type subscriptionType as {
+ subscriptionId:uuid,
+ param0:int
+};
+
+create dataset roomRecordsResults(result)
+primary key id autogenerated;
+
+create dataset roomRecordsSubscriptions(subscriptionType)
+primary key subscriptionId autogenerated;
+
+create dataset UserLocations(userLocation)
+primary key userId;
+
+create function RoomOccupants(room) {
+ (select location.userId
+ from UserLocations location
+ where location.roomNumber = room)
+};
+
+use channels;
+SET inline_with "false";
+insert into channels.roomRecordsResults as a (
+ with channelExecutionTime as current_datetime()
+ select result, channelExecutionTime, sub.subscriptionId as
subscriptionId,current_datetime() as deliveryTime
+ from channels.roomRecordsSubscriptions sub,
+ channels.RoomOccupants(sub.param0) result
+) returning a;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/not-is-missing-constant.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/not-is-missing-constant.plan
new file mode 100644
index 0000000..709b2b0
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/not-is-missing-constant.plan
@@ -0,0 +1,32 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$19] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$26][$$25] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$26]
|PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE
|PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE
|UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$25]
|PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE
|PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE
|PARTITIONED|
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results_parser_sqlpp/not-is-missing-constant.ast
b/asterixdb/asterix-app/src/test/resources/optimizerts/results_parser_sqlpp/not-is-missing-constant.ast
new file mode 100644
index 0000000..847539c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results_parser_sqlpp/not-is-missing-constant.ast
@@ -0,0 +1,23 @@
+DataverseUse channels
+TypeDecl userLocation [
+ open RecordType {
+ userId : int64,
+ roomNumber : int64
+ }
+]
+TypeDecl result [
+ open RecordType {
+ id : uuid
+ }
+]
+TypeDecl subscriptionType [
+ open RecordType {
+ subscriptionId : uuid,
+ param0 : int64
+ }
+]
+DatasetDecl roomRecordsResults(result) partitioned by [[id]] [autogenerated]
+DatasetDecl roomRecordsSubscriptions(subscriptionType) partitioned by
[[subscriptionId]] [autogenerated]
+DatasetDecl UserLocations(userLocation) partitioned by [[userId]]
+DataverseUse channels
+Set inline_with=false
\ No newline at end of file
diff --git
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
index 6d74957..2b7cad6 100644
---
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
+++
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
@@ -34,10 +34,6 @@
private final boolean ifNotExists;
private final List<String> paramList;
- public FunctionSignature getaAterixFunction() {
- return signature;
- }
-
public String getFunctionBody() {
return functionBody;
}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 863847b..7addf9d 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -864,6 +864,9 @@
public static final FunctionIdentifier EXTERNAL_LOOKUP = new
FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"external-lookup", FunctionIdentifier.VARARGS);
+ public static final FunctionIdentifier GET_JOB_PARAMETER =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"get-job-param", 1);
+
public static final FunctionIdentifier META = new
FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta",
FunctionIdentifier.VARARGS);
public static final FunctionIdentifier META_KEY = new
FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta-key",
@@ -1274,6 +1277,9 @@
// external lookup
addPrivateFunction(EXTERNAL_LOOKUP, AnyTypeComputer.INSTANCE, false);
+ // get job parameter
+ addFunction(GET_JOB_PARAMETER, AnyTypeComputer.INSTANCE, false);
+
// unnesting function
addPrivateFunction(SCAN_COLLECTION,
CollectionMemberResultType.INSTANCE, true);
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
index 1f9909c..4a421cc 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
@@ -22,8 +22,8 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -48,7 +48,7 @@
final GrowableArray resultArray = new GrowableArray();
final UTF8StringBuilder resultBuilder = new UTF8StringBuilder();
private final ArrayBackedValueStorage resultStorage = new
ArrayBackedValueStorage();
- private final DataOutput dataOutput = resultStorage.getDataOutput();
+ protected final DataOutput dataOutput = resultStorage.getDataOutput();
private final FunctionIdentifier funcID;
AbstractUnaryStringStringEval(IHyracksTaskContext context,
IScalarEvaluatorFactory argEvalFactory,
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
new file mode 100644
index 0000000..9d73dcc
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.evaluators.functions;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class GetJobParameterDescriptor extends
AbstractScalarFunctionDynamicDescriptor {
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new
IFunctionDescriptorFactory() {
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new GetJobParameterDescriptor();
+ }
+ };
+
+ @Override
+ public IScalarEvaluatorFactory
createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+ return new IScalarEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext
ctx) throws HyracksDataException {
+ return new AbstractUnaryStringStringEval(ctx, args[0],
+ GetJobParameterDescriptor.this.getIdentifier()) {
+ private byte[] result;
+
+ @Override
+ protected void process(UTF8StringPointable inputString,
IPointable resultPointable)
+ throws IOException {
+ result =
ctx.getJobParameter(inputString.getByteArray(), inputString.getStartOffset(),
+ inputString.getLength());
+ }
+
+ @Override
+ void writeResult(IPointable resultPointable) throws
IOException {
+ resultPointable.set(result, 0, result.length);
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.GET_JOB_PARAMETER;
+ }
+
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index c02732f..d831050 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -158,6 +158,7 @@
import
org.apache.asterix.runtime.evaluators.functions.FullTextContainsDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.FullTextContainsWithoutOptionDescriptor;
import org.apache.asterix.runtime.evaluators.functions.GetItemDescriptor;
+import
org.apache.asterix.runtime.evaluators.functions.GetJobParameterDescriptor;
import org.apache.asterix.runtime.evaluators.functions.GramTokensDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.HashedGramTokensDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.HashedWordTokensDescriptor;
@@ -426,6 +427,9 @@
// Inject failure function
temp.add(InjectFailureDescriptor.FACTORY);
+ // Get Job Parameter function
+ temp.add(GetJobParameterDescriptor.FACTORY);
+
// Switch case
temp.add(SwitchCaseDescriptor.FACTORY);
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 95479c1..d183363 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -22,6 +22,8 @@
import java.net.URL;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
import org.apache.hyracks.api.dataset.ResultSetId;
@@ -148,10 +150,10 @@
public static class DestroyJobFunction extends Function {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
+ private final long predestributedId;
- public DestroyJobFunction(JobId jobId) {
- this.jobId = jobId;
+ public DestroyJobFunction(long predestributedId) {
+ this.predestributedId = predestributedId;
}
@Override
@@ -159,8 +161,8 @@
return FunctionId.DESTROY_JOB;
}
- public JobId getJobId() {
- return jobId;
+ public long getPredistributedId() {
+ return predestributedId;
}
}
@@ -168,27 +170,30 @@
private static final long serialVersionUID = 1L;
private final byte[] acggfBytes;
- private final EnumSet<JobFlag> jobFlags;
+ private final Set<JobFlag> jobFlags;
private final DeploymentId deploymentId;
- private final JobId jobId;
+ private final long predistributedId;
+ private final Map<byte[], byte[]> jobParameters;
- public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes,
EnumSet<JobFlag> jobFlags, JobId jobId) {
+ public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes,
Set<JobFlag> jobFlags,
+ long predistributedId, Map<byte[], byte[]> jobParameters) {
this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
this.deploymentId = deploymentId;
- this.jobId = jobId;
+ this.predistributedId = predistributedId;
+ this.jobParameters = jobParameters;
}
- public StartJobFunction(JobId jobId) {
- this(null, null, EnumSet.noneOf(JobFlag.class), jobId);
+ public StartJobFunction(long jobId, Map<byte[], byte[]> jobParameters)
{
+ this(null, null, EnumSet.noneOf(JobFlag.class), jobId,
jobParameters);
}
- public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
- this(null, acggfBytes, jobFlags, null);
+ public StartJobFunction(byte[] acggfBytes, Set<JobFlag> jobFlags) {
+ this(null, acggfBytes, jobFlags, -1, null);
}
- public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes,
EnumSet<JobFlag> jobFlags) {
- this(deploymentId, acggfBytes, jobFlags, null);
+ public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes,
Set<JobFlag> jobFlags) {
+ this(deploymentId, acggfBytes, jobFlags, -1, null);
}
@Override
@@ -196,15 +201,19 @@
return FunctionId.START_JOB;
}
- public JobId getJobId() {
- return jobId;
+ public Map<byte[], byte[]> getJobParameters() {
+ return jobParameters;
+ }
+
+ public long getpredistributedId() {
+ return predistributedId;
}
public byte[] getACGGFBytes() {
return acggfBytes;
}
- public EnumSet<JobFlag> getJobFlags() {
+ public Set<JobFlag> getJobFlags() {
return jobFlags;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0142c7d..bdb270c 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -76,9 +76,9 @@
}
@Override
- public JobId startJob(JobId jobId) throws Exception {
+ public JobId startJob(long predestributedId, Map<byte[], byte[]>
jobParameters) throws Exception {
HyracksClientInterfaceFunctions.StartJobFunction sjf =
- new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
+ new
HyracksClientInterfaceFunctions.StartJobFunction(predestributedId,
jobParameters);
return (JobId) rpci.call(ipcHandle, sjf);
}
@@ -90,17 +90,17 @@
}
@Override
- public JobId distributeJob(byte[] acggfBytes) throws Exception {
+ public long distributeJob(byte[] acggfBytes) throws Exception {
HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
new
HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
- return (JobId) rpci.call(ipcHandle, sjf);
+ return (long) rpci.call(ipcHandle, sjf);
}
@Override
- public JobId destroyJob(JobId jobId) throws Exception {
+ public long destroyJob(long predestributedId) throws Exception {
HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
- new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
- return (JobId) rpci.call(ipcHandle, sjf);
+ new
HyracksClientInterfaceFunctions.DestroyJobFunction(predestributedId);
+ return (long) rpci.call(ipcHandle, sjf);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 75cbf61..ae7960c 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -108,27 +108,27 @@
}
@Override
- public JobId distributeJob(JobSpecification jobSpec) throws Exception {
- IActivityClusterGraphGeneratorFactory jsacggf =
+ public long distributeJob(JobSpecification jobSpec) throws Exception {
+ JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
new
JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
return distributeJob(jsacggf);
}
@Override
- public JobId destroyJob(JobId jobId) throws Exception {
- return hci.destroyJob(jobId);
+ public long destroyJob(long predestributedId) throws Exception {
+ return hci.destroyJob(predestributedId);
}
@Override
- public JobId startJob(JobId jobId) throws Exception {
- return hci.startJob(jobId);
+ public JobId startJob(long predestributedId, Map<byte[], byte[]>
jobParameters) throws Exception {
+ return hci.startJob(predestributedId, jobParameters);
}
public JobId startJob(IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags) throws Exception {
return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
}
- public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf)
throws Exception {
+ public long distributeJob(IActivityClusterGraphGeneratorFactory acggf)
throws Exception {
return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 0956d85..a29fe51 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -20,6 +20,7 @@
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.deployment.DeploymentId;
@@ -94,25 +95,27 @@
* Flags
* @throws Exception
*/
- public JobId distributeJob(JobSpecification jobSpec) throws Exception;
+ public long distributeJob(JobSpecification jobSpec) throws Exception;
/**
* Destroy the distributed graph for a pre-distributed job
*
- * @param jobId
+ * @param predestributedId
* The id of the predistributed job
* @throws Exception
*/
- public JobId destroyJob(JobId jobId) throws Exception;
+ public long destroyJob(long predestributedId) throws Exception;
/**
* Used to run a pre-distributed job by id (the same JobId will be
returned)
*
- * @param jobId
+ * @param predestributedId
* The id of the predistributed job
+ * @param jobParameters
+ * The serialized job parameters
* @throws Exception
*/
- public JobId startJob(JobId jobId) throws Exception;
+ public JobId startJob(long predestributedId, Map<byte[], byte[]>
jobParameters) throws Exception;
/**
* Start the specified Job.
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 1afbe9e..6b303be 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -38,13 +38,13 @@
public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws
Exception;
- public JobId startJob(JobId jobId) throws Exception;
+ public JobId startJob(long predestributedId, Map<byte[], byte[]>
jobParameters) throws Exception;
public void cancelJob(JobId jobId) throws Exception;
- public JobId distributeJob(byte[] acggfBytes) throws Exception;
+ public long distributeJob(byte[] acggfBytes) throws Exception;
- public JobId destroyJob(JobId jobId) throws Exception;
+ public long destroyJob(long predestributedId) throws Exception;
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
index 7dd5fe9..0b2cc9b 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
@@ -28,7 +28,6 @@
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
@@ -36,7 +35,6 @@
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.ActivityClusterId;
import org.apache.hyracks.api.job.JobActivityGraph;
-import org.apache.hyracks.api.job.JobId;
public class ActivityClusterGraphBuilder {
private static final Logger LOGGER =
Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
@@ -70,7 +68,7 @@
return null;
}
- public ActivityClusterGraph inferActivityClusters(JobId jobId,
JobActivityGraph jag) {
+ public ActivityClusterGraph inferActivityClusters(JobActivityGraph jag) {
/*
* Build initial equivalence sets map. We create a map such that for
each IOperatorTask, t -> { t }
*/
@@ -99,7 +97,7 @@
Map<ActivityId, IActivity> activityNodeMap = jag.getActivityMap();
List<ActivityCluster> acList = new ArrayList<ActivityCluster>();
for (Set<ActivityId> stage : stages) {
- ActivityCluster ac = new ActivityCluster(acg, new
ActivityClusterId(jobId, acCounter++));
+ ActivityCluster ac = new ActivityCluster(acg, new
ActivityClusterId(acCounter++));
acList.add(ac);
for (ActivityId aid : stage) {
IActivity activity = activityNodeMap.get(aid);
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index c712b36..ddf0ce8 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -32,7 +32,6 @@
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobActivityGraph;
import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.rewriter.ActivityClusterGraphRewriter;
@@ -51,8 +50,8 @@
}
@Override
- public IActivityClusterGraphGenerator
createActivityClusterGraphGenerator(JobId jobId,
- final ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags)
throws HyracksException {
+ public IActivityClusterGraphGenerator
createActivityClusterGraphGenerator(final ICCServiceContext ccServiceCtx,
+ Set<JobFlag> jobFlags) throws HyracksException {
final JobActivityGraphBuilder builder = new
JobActivityGraphBuilder(spec, jobFlags);
PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
@Override
@@ -70,7 +69,7 @@
final JobActivityGraph jag = builder.getActivityGraph();
ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
- final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId,
jag);
+ final ActivityClusterGraph acg = acgb.inferActivityClusters(jag);
acg.setFrameSize(spec.getFrameSize());
acg.setMaxReattempts(spec.getMaxReattempts());
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index df693b2..4358cbe 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.IOperatorEnvironment;
import org.apache.hyracks.api.job.JobFlag;
@@ -51,5 +52,7 @@
Object getSharedObject();
+ public byte[] getJobParameter(byte[] name, int start, int length) throws
HyracksException;
+
Set<JobFlag> getJobFlags();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
index b64e2d5..8432a6f 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterGraph.java
@@ -24,14 +24,15 @@
import java.util.List;
import java.util.Map;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import
org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class ActivityClusterGraph implements Serializable {
private static final long serialVersionUID = 1L;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
index e0c5279..46ebf4d 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
@@ -23,17 +23,10 @@
public final class ActivityClusterId implements Serializable {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
-
private final int id;
- public ActivityClusterId(JobId jobId, int id) {
- this.jobId = jobId;
+ public ActivityClusterId(int id) {
this.id = id;
- }
-
- public JobId getJobId() {
- return jobId;
}
public int getId() {
@@ -45,7 +38,6 @@
final int prime = 31;
int result = 1;
result = prime * result + id;
- result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
return result;
}
@@ -64,18 +56,11 @@
if (id != other.id) {
return false;
}
- if (jobId == null) {
- if (other.jobId != null) {
- return false;
- }
- } else if (!jobId.equals(other.jobId)) {
- return false;
- }
return true;
}
@Override
public String toString() {
- return "ACID:" + jobId + ":" + id;
+ return "ACID:" + ":" + id;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 133e342..d23b944 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -25,7 +25,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
public interface IActivityClusterGraphGeneratorFactory extends Serializable {
- public IActivityClusterGraphGenerator
createActivityClusterGraphGenerator(JobId jobId,
+ public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(
ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws
HyracksException;
public JobSpecification getJobSpecification();
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
new file mode 100644
index 0000000..048db5e
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class JobParameterByteStore implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private Map<byte[], byte[]> vars;
+ private Map<String, byte[]> nonpureSingletonValues;
+ private final byte[] empty = new byte[0];
+
+ public JobParameterByteStore() {
+ vars = new HashMap<>();
+ }
+
+ public Map<byte[], byte[]> getParameterMap() {
+ return vars;
+ }
+
+ public void setParameters(Map<byte[], byte[]> map) {
+ vars = map;
+ }
+
+ public byte[] getParameterValue(byte[] name, int start, int length) {
+ for (Entry<byte[], byte[]> entry : vars.entrySet()) {
+ byte[] key = entry.getKey();
+ if (key.length == length) {
+ boolean matched = true;
+ for (int j = 0; j < length; j++) {
+ if (key[j] != name[j + start]) {
+ matched = false;
+ break;
+ }
+ }
+ if (matched) {
+ return entry.getValue();
+ }
+ }
+ }
+ return empty;
+ }
+
+ public synchronized byte[] getNonpureSingletonValue(String functionName) {
+ byte[] value = nonpureSingletonValues.get(functionName);
+ if (value == null) {
+ nonpureSingletonValues.put(functionName, value);
+ }
+ return value;
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 327c422..2e4bbaa 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -53,6 +53,7 @@
private static final Logger LOGGER =
Logger.getLogger(ClientInterfaceIPCI.class.getName());
private final ClusterControllerService ccs;
private final JobIdFactory jobIdFactory;
+ private long predistributedId = 0;
ClientInterfaceIPCI(ClusterControllerService ccs, JobIdFactory
jobIdFactory) {
this.ccs = ccs;
@@ -85,14 +86,14 @@
case DISTRIBUTE_JOB:
HyracksClientInterfaceFunctions.DistributeJobFunction djf =
(HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
- ccs.getWorkQueue().schedule(new DistributeJobWork(ccs,
djf.getACGGFBytes(), jobIdFactory,
- new IPCResponder<JobId>(handle, mid)));
+ ccs.getWorkQueue().schedule(new DistributeJobWork(ccs,
djf.getACGGFBytes(), predistributedId++,
+ new IPCResponder<Long>(handle, mid)));
break;
case DESTROY_JOB:
HyracksClientInterfaceFunctions.DestroyJobFunction dsjf =
(HyracksClientInterfaceFunctions.DestroyJobFunction)
fn;
- ccs.getWorkQueue()
- .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new
IPCResponder<JobId>(handle, mid)));
+ ccs.getWorkQueue().schedule(
+ new DestroyJobWork(ccs, dsjf.getPredistributedId(),
new IPCResponder<Long>(handle, mid)));
break;
case CANCEL_JOB:
HyracksClientInterfaceFunctions.CancelJobFunction cjf =
@@ -103,8 +104,14 @@
case START_JOB:
HyracksClientInterfaceFunctions.StartJobFunction sjf =
(HyracksClientInterfaceFunctions.StartJobFunction) fn;
- ccs.getWorkQueue().schedule(new JobStartWork(ccs,
sjf.getDeploymentId(), sjf.getACGGFBytes(),
- sjf.getJobFlags(), sjf.getJobId(), new
IPCResponder<JobId>(handle, mid), jobIdFactory));
+ long id = sjf.getpredistributedId();
+ byte[] acggfBytes = null;
+ if (id == -1) {
+ //The job is new
+ acggfBytes = sjf.getACGGFBytes();
+ }
+ ccs.getWorkQueue().schedule(new JobStartWork(ccs,
sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
+ jobIdFactory, sjf.getJobParameters(), new
IPCResponder<JobId>(handle, mid), id));
break;
case GET_DATASET_DIRECTORY_SERIVICE_INFO:
ccs.getWorkQueue().schedule(
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 350984c..edae49f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -102,7 +102,8 @@
case DISTRIBUTED_JOB_FAILURE:
CCNCFunctions.ReportDistributedJobFailureFunction rdjf =
(CCNCFunctions.ReportDistributedJobFailureFunction) fn;
- ccs.getWorkQueue().schedule(new
DistributedJobFailureWork(rdjf.getJobId(), rdjf.getNodeId()));
+ ccs.getWorkQueue()
+ .schedule(new
DistributedJobFailureWork(rdjf.getPredistributedId(), rdjf.getNodeId()));
break;
case REGISTER_PARTITION_PROVIDER:
CCNCFunctions.RegisterPartitionProviderFunction rppf =
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index dfc79ed..7a8a4ae 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -50,7 +50,9 @@
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
@@ -109,6 +111,8 @@
private final PreDistributedJobStore preDistributedJobStore = new
PreDistributedJobStore();
+ private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap =
new HashMap<>();
+
private final WorkQueue workQueue;
private ExecutorService executor;
@@ -164,8 +168,8 @@
final ClusterTopology topology = computeClusterTopology(ccConfig);
ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
- datasetDirectoryService = new
DatasetDirectoryService(ccConfig.getResultTTL(),
- ccConfig.getResultSweepThreshold(), preDistributedJobStore);
+ datasetDirectoryService =
+ new DatasetDirectoryService(ccConfig.getResultTTL(),
ccConfig.getResultSweepThreshold());
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
@@ -352,6 +356,19 @@
return preDistributedJobStore;
}
+ public void removeJobParameterByteStore(JobId jobId) throws
HyracksException {
+ jobParameterByteStoreMap.remove(jobId);
+ }
+
+ public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId)
throws HyracksException {
+ JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
+ if (jpbs == null) {
+ jpbs = new JobParameterByteStore();
+ jobParameterByteStoreMap.put(jobId, jpbs);
+ }
+ return jpbs;
+ }
+
public IResourceManager getResourceManager() {
return resourceManager;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
index 117621f..9e5a1a7 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
@@ -26,52 +26,47 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
public class PreDistributedJobStore {
- private final Map<JobId, PreDistributedJobDescriptor>
preDistributedJobDescriptorMap;
+ private final Map<Long, PreDistributedJobDescriptor>
preDistributedJobDescriptorMap;
public PreDistributedJobStore() {
preDistributedJobDescriptorMap = new Hashtable<>();
}
- public void addDistributedJobDescriptor(JobId jobId, ActivityClusterGraph
activityClusterGraph,
+ public void addDistributedJobDescriptor(long predestributedId,
ActivityClusterGraph activityClusterGraph,
JobSpecification jobSpecification, Set<Constraint>
activityClusterGraphConstraints)
throws HyracksException {
- if (preDistributedJobDescriptorMap.get(jobId) != null) {
- throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB,
jobId);
+ if (preDistributedJobDescriptorMap.get(predestributedId) != null) {
+ throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB,
predestributedId);
}
PreDistributedJobDescriptor descriptor =
new PreDistributedJobDescriptor(activityClusterGraph,
jobSpecification, activityClusterGraphConstraints);
- preDistributedJobDescriptorMap.put(jobId, descriptor);
+ preDistributedJobDescriptorMap.put(predestributedId, descriptor);
}
- public void checkForExistingDistributedJobDescriptor(JobId jobId) throws
HyracksException {
- if (preDistributedJobDescriptorMap.get(jobId) != null) {
- throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB,
jobId);
+ public void checkForExistingDistributedJobDescriptor(long
predestributedId) throws HyracksException {
+ if (preDistributedJobDescriptorMap.get(predestributedId) != null) {
+ throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB,
predestributedId);
}
}
- public PreDistributedJobDescriptor getDistributedJobDescriptor(JobId
jobId) throws HyracksException {
- PreDistributedJobDescriptor descriptor =
preDistributedJobDescriptorMap.get(jobId);
+ public PreDistributedJobDescriptor getDistributedJobDescriptor(long
predestributedId) throws HyracksException {
+ PreDistributedJobDescriptor descriptor =
preDistributedJobDescriptorMap.get(predestributedId);
if (descriptor == null) {
- throw
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+ throw
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB,
predestributedId);
}
return descriptor;
}
- public boolean jobIsPredistributed(JobId jobId) {
- return preDistributedJobDescriptorMap.get(jobId) != null;
- }
-
- public void removeDistributedJobDescriptor(JobId jobId) throws
HyracksException {
- PreDistributedJobDescriptor descriptor =
preDistributedJobDescriptorMap.get(jobId);
+ public void removeDistributedJobDescriptor(long predestributedId) throws
HyracksException {
+ PreDistributedJobDescriptor descriptor =
preDistributedJobDescriptorMap.get(predestributedId);
if (descriptor == null) {
- throw
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+ throw
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB,
predestributedId);
}
- preDistributedJobDescriptorMap.remove(jobId);
+ preDistributedJobDescriptorMap.remove(predestributedId);
}
public class PreDistributedJobDescriptor {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 7a9306c..a2b667c 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -42,7 +42,6 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.control.cc.PreDistributedJobStore;
import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -63,14 +62,10 @@
private final Map<JobId, JobResultInfo> jobResultLocations;
- private final PreDistributedJobStore preDistributedJobStore;
-
- public DatasetDirectoryService(long resultTTL, long resultSweepThreshold,
- PreDistributedJobStore preDistributedJobStore) {
+ public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
- this.preDistributedJobStore = preDistributedJobStore;
- jobResultLocations = new LinkedHashMap<>();
+ jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
}
@Override
@@ -186,9 +181,6 @@
@Override
public synchronized long getResultTimestamp(JobId jobId) {
- if (preDistributedJobStore.jobIsPredistributed(jobId)) {
- return -1;
- }
return getState(jobId).getTimestamp();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index a3078b6..d4214c4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -77,7 +77,7 @@
private final PartitionConstraintSolver solver;
- private final boolean predistributed;
+ private final long predistributedId;
private final Map<PartitionId, TaskCluster>
partitionProducingTaskClusterMap;
@@ -88,10 +88,10 @@
private boolean cancelled = false;
public JobExecutor(ClusterControllerService ccs, JobRun jobRun,
Collection<Constraint> constraints,
- boolean predistributed) {
+ long predistributedId) {
this.ccs = ccs;
this.jobRun = jobRun;
- this.predistributed = predistributed;
+ this.predistributedId = predistributedId;
solver = new PartitionConstraintSolver();
partitionProducingTaskClusterMap = new HashMap<>();
inProgressTaskClusters = new HashSet<>();
@@ -100,7 +100,7 @@
}
public boolean isPredistributed() {
- return predistributed;
+ return predistributedId != -1;
}
public JobRun getJobRun() {
@@ -503,7 +503,7 @@
new HashMap<>(jobRun.getConnectorPolicyMap());
INodeManager nodeManager = ccs.getNodeManager();
try {
- byte[] acgBytes = predistributed ? null :
JavaSerializationUtils.serialize(acg);
+ byte[] acgBytes = isPredistributed() ? null :
JavaSerializationUtils.serialize(acg);
for (Map.Entry<String, List<TaskAttemptDescriptor>> entry :
taskAttemptMap.entrySet()) {
String nodeId = entry.getKey();
final List<TaskAttemptDescriptor> taskDescriptors =
entry.getValue();
@@ -516,7 +516,8 @@
}
byte[] jagBytes = changed ? acgBytes : null;
node.getNodeController().startTasks(deploymentId, jobId,
jagBytes, taskDescriptors,
- connectorPolicies, jobRun.getFlags());
+ connectorPolicies, jobRun.getFlags(),
+
ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(),
predistributedId);
}
}
} catch (Exception e) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index c6d90a7..66f4a96 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -142,6 +142,7 @@
@Override
public void prepareComplete(JobRun run, JobStatus status, List<Exception>
exceptions) throws HyracksException {
+ ccs.removeJobParameterByteStore(run.getJobId());
checkJob(run);
if (status == JobStatus.FAILURE_BEFORE_EXECUTION) {
run.setPendingStatus(JobStatus.FAILURE, exceptions);
@@ -301,9 +302,7 @@
CCServiceContext serviceCtx = ccs.getContext();
JobSpecification spec = run.getJobSpecification();
- if (!run.getExecutor().isPredistributed()) {
- serviceCtx.notifyJobCreation(jobId, spec);
- }
+ serviceCtx.notifyJobCreation(jobId, spec);
run.setStatus(JobStatus.RUNNING, null);
executeJobInternal(run);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 95a6d9b..641a636 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -116,19 +116,21 @@
//Run a Pre-distributed job by passing the JobId
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId,
JobId jobId, Set<JobFlag> jobFlags,
- PreDistributedJobDescriptor distributedJobDescriptor)
+ PreDistributedJobDescriptor distributedJobDescriptor, Map<byte[],
byte[]> jobParameters,
+ long predestributedId)
throws HyracksException {
this(deploymentId, jobId, jobFlags,
distributedJobDescriptor.getJobSpecification(),
distributedJobDescriptor.getActivityClusterGraph());
+
ccs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters);
Set<Constraint> constaints =
distributedJobDescriptor.getActivityClusterGraphConstraints();
- this.scheduler = new JobExecutor(ccs, this, constaints, true);
+ this.scheduler = new JobExecutor(ccs, this, constaints,
predestributedId);
}
//Run a new job by creating an ActivityClusterGraph
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId,
JobId jobId,
IActivityClusterGraphGeneratorFactory acggf,
IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags) {
this(deploymentId, jobId, jobFlags, acggf.getJobSpecification(),
acgg.initialize());
- this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(),
false);
+ this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), -1);
}
public DeploymentId getDeploymentId() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
index df98252..66cfa7f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
@@ -18,7 +18,6 @@
*/
package org.apache.hyracks.control.cc.work;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.cluster.INodeManager;
@@ -27,11 +26,11 @@
public class DestroyJobWork extends SynchronizableWork {
private final ClusterControllerService ccs;
- private final JobId jobId;
- private final IResultCallback<JobId> callback;
+ private final long predestributedId;
+ private final IResultCallback<Long> callback;
- public DestroyJobWork(ClusterControllerService ccs, JobId jobId,
IResultCallback<JobId> callback) {
- this.jobId = jobId;
+ public DestroyJobWork(ClusterControllerService ccs, long predestributedId,
IResultCallback<Long> callback) {
+ this.predestributedId = predestributedId;
this.ccs = ccs;
this.callback = callback;
}
@@ -39,12 +38,12 @@
@Override
protected void doRun() throws Exception {
try {
-
ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(jobId);
+
ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(predestributedId);
INodeManager nodeManager = ccs.getNodeManager();
for (NodeControllerState node :
nodeManager.getAllNodeControllerStates()) {
- node.getNodeController().destroyJob(jobId);
+ node.getNodeController().destroyJob(predestributedId);
}
- callback.setValue(jobId);
+ callback.setValue(predestributedId);
} catch (Exception e) {
callback.setException(e);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
index 5a57b1b..e9985bf 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
@@ -24,8 +24,6 @@
import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
@@ -38,12 +36,12 @@
public class DistributeJobWork extends SynchronizableWork {
private final ClusterControllerService ccs;
private final byte[] acggfBytes;
- private final JobIdFactory jobIdFactory;
- private final IResultCallback<JobId> callback;
+ private final long predestributedId;
+ private final IResultCallback<Long> callback;
- public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes,
JobIdFactory jobIdFactory,
- IResultCallback<JobId> callback) {
- this.jobIdFactory = jobIdFactory;
+ public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes,
long predestributedId,
+ IResultCallback<Long> callback) {
+ this.predestributedId = predestributedId;
this.ccs = ccs;
this.acggfBytes = acggfBytes;
this.callback = callback;
@@ -52,27 +50,25 @@
@Override
protected void doRun() throws Exception {
try {
- JobId jobId = jobIdFactory.create();
final CCServiceContext ccServiceCtx = ccs.getContext();
-
ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId);
+
ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(predestributedId);
IActivityClusterGraphGeneratorFactory acggf =
(IActivityClusterGraphGeneratorFactory)
DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx);
IActivityClusterGraphGenerator acgg =
- acggf.createActivityClusterGraphGenerator(jobId,
ccServiceCtx, EnumSet.noneOf(JobFlag.class));
+ acggf.createActivityClusterGraphGenerator(ccServiceCtx,
EnumSet.noneOf(JobFlag.class));
ActivityClusterGraph acg = acgg.initialize();
- ccs.getPreDistributedJobStore().addDistributedJobDescriptor(jobId,
acg, acggf.getJobSpecification(),
+
ccs.getPreDistributedJobStore().addDistributedJobDescriptor(predestributedId,
acg,
+ acggf.getJobSpecification(),
acgg.getConstraints());
-
- ccServiceCtx.notifyJobCreation(jobId, acggf.getJobSpecification());
byte[] acgBytes = JavaSerializationUtils.serialize(acg);
INodeManager nodeManager = ccs.getNodeManager();
for (NodeControllerState node :
nodeManager.getAllNodeControllerStates()) {
- node.getNodeController().distributeJob(jobId, acgBytes);
+ node.getNodeController().distributeJob(predestributedId,
acgBytes);
}
- callback.setValue(jobId);
+ callback.setValue(predestributedId);
} catch (Exception e) {
callback.setException(e);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
index f7fa2a4..db4b35a 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
@@ -20,20 +20,19 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.work.SynchronizableWork;
public class DistributedJobFailureWork extends SynchronizableWork {
- protected final JobId jobId;
+ protected final long predistributedId;
protected final String nodeId;
- public DistributedJobFailureWork(JobId jobId, String nodeId) {
- this.jobId = jobId;
+ public DistributedJobFailureWork(long predistributedId, String nodeId) {
+ this.predistributedId = predistributedId;
this.nodeId = nodeId;
}
@Override
public void doRun() throws HyracksException {
- throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE,
jobId, nodeId);
+ throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE,
predistributedId, nodeId);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index ed82705..d5a2dde 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -18,7 +18,8 @@
*/
package org.apache.hyracks.control.cc.work;
-import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
@@ -37,20 +38,24 @@
public class JobStartWork extends SynchronizableWork {
private final ClusterControllerService ccs;
private final byte[] acggfBytes;
- private final EnumSet<JobFlag> jobFlags;
+ private final Set<JobFlag> jobFlags;
private final DeploymentId deploymentId;
- private final JobId preDistributedJobId;
private final IResultCallback<JobId> callback;
private final JobIdFactory jobIdFactory;
+ private final long predestributedId;
+ private final Map<byte[], byte[]> jobParameters;
public JobStartWork(ClusterControllerService ccs, DeploymentId
deploymentId, byte[] acggfBytes,
- EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId>
callback, JobIdFactory jobIdFactory) {
+ Set<JobFlag> jobFlags, JobIdFactory jobIdFactory, Map<byte[],
byte[]> jobParameters,
+ IResultCallback<JobId> callback,
+ long predestributedId) {
this.deploymentId = deploymentId;
- this.preDistributedJobId = jobId;
this.ccs = ccs;
this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
this.callback = callback;
+ this.predestributedId = predestributedId;
+ this.jobParameters = jobParameters;
this.jobIdFactory = jobIdFactory;
}
@@ -61,19 +66,18 @@
final CCServiceContext ccServiceCtx = ccs.getContext();
JobId jobId;
JobRun run;
- if (preDistributedJobId == null) {
- jobId = jobIdFactory.create();
+ jobId = jobIdFactory.create();
+ if (predestributedId == -1) {
//Need to create the ActivityClusterGraph
IActivityClusterGraphGeneratorFactory acggf =
(IActivityClusterGraphGeneratorFactory) DeploymentUtils
.deserialize(acggfBytes, deploymentId, ccServiceCtx);
- IActivityClusterGraphGenerator acgg =
- acggf.createActivityClusterGraphGenerator(jobId,
ccServiceCtx, jobFlags);
+ IActivityClusterGraphGenerator acgg =
acggf.createActivityClusterGraphGenerator(ccServiceCtx, jobFlags);
run = new JobRun(ccs, deploymentId, jobId, acggf, acgg,
jobFlags);
} else {
- jobId = preDistributedJobId;
//ActivityClusterGraph has already been distributed
run = new JobRun(ccs, deploymentId, jobId, jobFlags,
-
ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
+
ccs.getPreDistributedJobStore().getDistributedJobDescriptor(predestributedId),
jobParameters,
+ predestributedId);
}
jobManager.add(run);
callback.setValue(jobId);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index ec8e045..813836a 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -44,7 +44,7 @@
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String
nodeId, List<Exception> exceptions)
throws Exception;
- public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws
Exception;
+ public void notifyDistributedJobFailure(long predestributedId, String
nodeId) throws Exception;
public void notifyJobletCleanup(JobId jobId, String nodeId) throws
Exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index a10f8f0..a3d7719 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.control.common.base;
import java.net.URL;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,7 +37,7 @@
public interface INodeController {
public void startTasks(DeploymentId deploymentId, JobId jobId, byte[]
planBytes,
List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- Set<JobFlag> flags) throws Exception;
+ Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, long
predistributedId) throws Exception;
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws
Exception;
@@ -50,9 +49,9 @@
public void undeployBinary(DeploymentId deploymentId) throws Exception;
- public void distributeJob(JobId jobId, byte[] planBytes) throws Exception;
+ public void distributeJob(long predistributedId, byte[] planBytes) throws
Exception;
- public void destroyJob(JobId jobId) throws Exception;
+ public void destroyJob(long predestributedId) throws Exception;
public void dumpState(String stateDumpId) throws Exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index d42c4a8..7e40fbf 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -288,11 +288,11 @@
public static class ReportDistributedJobFailureFunction extends Function {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
+ private final long predestributedId;
private final String nodeId;
- public ReportDistributedJobFailureFunction(JobId jobId, String nodeId)
{
- this.jobId = jobId;
+ public ReportDistributedJobFailureFunction(long predestributedId,
String nodeId) {
+ this.predestributedId = predestributedId;
this.nodeId = nodeId;
}
@@ -301,8 +301,8 @@
return FunctionId.DISTRIBUTED_JOB_FAILURE;
}
- public JobId getJobId() {
- return jobId;
+ public long getPredistributedId() {
+ return predestributedId;
}
public String getNodeId() {
@@ -668,12 +668,12 @@
public static class DistributeJobFunction extends Function {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
+ private final long predistributedId;
private final byte[] acgBytes;
- public DistributeJobFunction(JobId jobId, byte[] acgBytes) {
- this.jobId = jobId;
+ public DistributeJobFunction(long predistributedId, byte[] acgBytes) {
+ this.predistributedId = predistributedId;
this.acgBytes = acgBytes;
}
@@ -682,8 +682,8 @@
return FunctionId.DISTRIBUTE_JOB;
}
- public JobId getJobId() {
- return jobId;
+ public long getPredistributedId() {
+ return predistributedId;
}
public byte[] getacgBytes() {
@@ -694,10 +694,10 @@
public static class DestroyJobFunction extends Function {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
+ private final long predistributedId;
- public DestroyJobFunction(JobId jobId) {
- this.jobId = jobId;
+ public DestroyJobFunction(long predistributedId) {
+ this.predistributedId = predistributedId;
}
@Override
@@ -705,8 +705,8 @@
return FunctionId.DESTROY_JOB;
}
- public JobId getJobId() {
- return jobId;
+ public long getPredistributedId() {
+ return predistributedId;
}
}
@@ -719,16 +719,21 @@
private final List<TaskAttemptDescriptor> taskDescriptors;
private final Map<ConnectorDescriptorId, IConnectorPolicy>
connectorPolicies;
private final Set<JobFlag> flags;
+ private final Map<byte[], byte[]> jobParameters;
+ private final long predistributedId;
public StartTasksFunction(DeploymentId deploymentId, JobId jobId,
byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy>
connectorPolicies, Set<JobFlag> flags) {
+ Map<ConnectorDescriptorId, IConnectorPolicy>
connectorPolicies, Set<JobFlag> flags,
+ Map<byte[], byte[]> jobParameters, long predistributedId) {
this.deploymentId = deploymentId;
this.jobId = jobId;
this.planBytes = planBytes;
this.taskDescriptors = taskDescriptors;
this.connectorPolicies = connectorPolicies;
this.flags = flags;
+ this.jobParameters = jobParameters;
+ this.predistributedId = predistributedId;
}
@Override
@@ -740,8 +745,16 @@
return deploymentId;
}
+ public long getPredistributedId() {
+ return predistributedId;
+ }
+
public JobId getJobId() {
return jobId;
+ }
+
+ public Map<byte[], byte[]> getJobParameters() {
+ return jobParameters;
}
public byte[] getPlanBytes() {
@@ -804,7 +817,29 @@
flags.add(JobFlag.values()[(dis.readInt())]);
}
- return new StartTasksFunction(deploymentId, jobId, planBytes,
taskDescriptors, connectorPolicies, flags);
+ // read job parameters
+ int runTimeVarsSize = dis.readInt();
+ Map<byte[], byte[]> jobParameters = new HashMap<>();
+ for (int i = 0; i < runTimeVarsSize; i++) {
+ int nameLength = dis.readInt();
+ byte[] nameBytes = null;
+ if (nameLength >= 0) {
+ nameBytes = new byte[nameLength];
+ dis.read(nameBytes, 0, nameLength);
+ }
+ int valueLength = dis.readInt();
+ byte[] valueBytes = null;
+ if (valueLength >= 0) {
+ valueBytes = new byte[valueLength];
+ dis.read(valueBytes, 0, valueLength);
+ }
+ jobParameters.put(nameBytes, valueBytes);
+ }
+
+ long predistributedId = dis.readLong();
+
+ return new StartTasksFunction(deploymentId, jobId, planBytes,
taskDescriptors, connectorPolicies, flags,
+ jobParameters, predistributedId);
}
public static void serialize(OutputStream out, Object object) throws
Exception {
@@ -842,6 +877,19 @@
for (JobFlag flag : fn.flags) {
dos.writeInt(flag.ordinal());
}
+
+ //write job parameters
+ dos.writeInt(fn.jobParameters.size());
+ for (Entry<byte[], byte[]> entry : fn.jobParameters.entrySet()) {
+ dos.writeInt(entry.getKey().length);
+ dos.write(entry.getKey(), 0, entry.getKey().length);
+ dos.writeInt(entry.getValue().length);
+ dos.write(entry.getValue(), 0, entry.getValue().length);
+ }
+
+ //write predistributed id
+ dos.writeLong(fn.predistributedId);
+
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 4707487..f0443b5 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,8 +18,6 @@
*/
package org.apache.hyracks.control.common.ipc;
-import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
-
import java.net.InetSocketAddress;
import java.util.List;
import java.util.logging.Logger;
@@ -33,6 +31,24 @@
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.deployment.DeploymentStatus;
import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.GetNodeControllersInfoFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.NodeHeartbeatFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyDeployBinaryFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyJobletCleanupFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyTaskCompleteFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyTaskFailureFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterNodeFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionProviderFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionRequestFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterResultPartitionLocationFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportDistributedJobFailureFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportProfileFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionWriteCompletionFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpResponseFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.UnregisterNodeFunction;
import org.apache.hyracks.control.common.job.PartitionDescriptor;
import org.apache.hyracks.control.common.job.PartitionRequest;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
@@ -150,9 +166,8 @@
}
@Override
- public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws
Exception {
- ReportDistributedJobFailureFunction fn = new
ReportDistributedJobFailureFunction(
- jobId, nodeId);
+ public void notifyDistributedJobFailure(long predestributedId, String
nodeId) throws Exception {
+ ReportDistributedJobFailureFunction fn = new
ReportDistributedJobFailureFunction(predestributedId, nodeId);
ensureIpcHandle().send(-1, fn, null);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 68a5b76..e703acc 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -18,8 +18,6 @@
*/
package org.apache.hyracks.control.common.ipc;
-import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
-
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.List;
@@ -37,6 +35,18 @@
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.base.INodeController;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortTasksFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.CleanupJobletFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.DeployBinaryFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.DestroyJobFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.DistributeJobFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportPartitionAvailabilityFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownRequestFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StartTasksFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpRequestFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.UnDeployBinaryFunction;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
import org.apache.hyracks.ipc.impl.IPCSystem;
@@ -60,9 +70,9 @@
@Override
public void startTasks(DeploymentId deploymentId, JobId jobId, byte[]
planBytes,
List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- Set<JobFlag> flags) throws Exception {
+ Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, long
predistributedId) throws Exception {
StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId,
planBytes,
- taskDescriptors, connectorPolicies, flags);
+ taskDescriptors, connectorPolicies, flags, jobParameters,
predistributedId);
ensureIpcHandle().send(-1, stf, null);
}
@@ -98,14 +108,14 @@
}
@Override
- public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
- DistributeJobFunction fn = new DistributeJobFunction(jobId, planBytes);
+ public void distributeJob(long predistributedId, byte[] planBytes) throws
Exception {
+ DistributeJobFunction fn = new DistributeJobFunction(predistributedId,
planBytes);
ensureIpcHandle().send(-1, fn, null);
}
@Override
- public void destroyJob(JobId jobId) throws Exception {
- DestroyJobFunction fn = new DestroyJobFunction(jobId);
+ public void destroyJob(long predistributedId) throws Exception {
+ DestroyJobFunction fn = new DestroyJobFunction(predistributedId);
ensureIpcHandle().send(-1, fn, null);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index c416942..4ab14f4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -62,8 +62,10 @@
return;
case START_TASKS:
CCNCFunctions.StartTasksFunction stf =
(CCNCFunctions.StartTasksFunction) fn;
- ncs.getWorkQueue().schedule(new StartTasksWork(ncs,
stf.getDeploymentId(), stf.getJobId(),
- stf.getPlanBytes(), stf.getTaskDescriptors(),
stf.getConnectorPolicies(), stf.getFlags()));
+ ncs.getWorkQueue()
+ .schedule(new StartTasksWork(ncs,
stf.getDeploymentId(), stf.getJobId(), stf.getPlanBytes(),
+ stf.getTaskDescriptors(),
stf.getConnectorPolicies(), stf.getFlags(),
+ stf.getJobParameters(),
stf.getPredistributedId()));
return;
case ABORT_TASKS:
CCNCFunctions.AbortTasksFunction atf =
(CCNCFunctions.AbortTasksFunction) fn;
@@ -103,12 +105,12 @@
case DISTRIBUTE_JOB:
CCNCFunctions.DistributeJobFunction djf =
(CCNCFunctions.DistributeJobFunction) fn;
- ncs.getWorkQueue().schedule(new DistributeJobWork(ncs,
djf.getJobId(), djf.getacgBytes()));
+ ncs.getWorkQueue().schedule(new DistributeJobWork(ncs,
djf.getPredistributedId(), djf.getacgBytes()));
return;
case DESTROY_JOB:
CCNCFunctions.DestroyJobFunction dsjf =
(CCNCFunctions.DestroyJobFunction) fn;
- ncs.getWorkQueue().schedule(new DestroyJobWork(ncs,
dsjf.getJobId()));
+ ncs.getWorkQueue().schedule(new DestroyJobWork(ncs,
dsjf.getPredistributedId()));
return;
case STATE_DUMP_REQUEST:
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index a426d47..eab40d6 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -30,6 +30,7 @@
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
@@ -54,6 +55,7 @@
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobParameterByteStore;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.service.IControllerService;
@@ -125,7 +127,9 @@
private final Map<JobId, Joblet> jobletMap;
- private final Map<JobId, ActivityClusterGraph>
preDistributedJobActivityClusterGraphMap;
+ private final Map<Long, ActivityClusterGraph>
preDistributedJobActivityClusterGraphMap;
+
+ private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap =
new HashMap<>();
private ExecutorService executor;
@@ -417,28 +421,42 @@
return jobletMap;
}
- public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph
acg) throws HyracksException {
- if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
- throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB,
jobId);
- }
- preDistributedJobActivityClusterGraphMap.put(jobId, acg);
+ public void removeJobParameterByteStore(JobId jobId) {
+ jobParameterByteStoreMap.remove(jobId);
}
- public void removeActivityClusterGraph(JobId jobId) throws
HyracksException {
- if (preDistributedJobActivityClusterGraphMap.get(jobId) == null) {
- throw
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+ public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId)
throws HyracksException {
+ JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
+ if (jpbs == null){
+ jpbs = new JobParameterByteStore();
+ jobParameterByteStoreMap.put(jobId, jpbs);
}
- preDistributedJobActivityClusterGraphMap.remove(jobId);
+ return jpbs;
}
- public void checkForDuplicateDistributedJob(JobId jobId) throws
HyracksException {
- if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
- throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB,
jobId);
+
+ public void storeActivityClusterGraph(long predestributedId,
ActivityClusterGraph acg) throws HyracksException {
+ if (preDistributedJobActivityClusterGraphMap.get(predestributedId) !=
null) {
+ throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB,
predestributedId);
+ }
+ preDistributedJobActivityClusterGraphMap.put(predestributedId, acg);
+ }
+
+ public void removeActivityClusterGraph(long predestributedId) throws
HyracksException {
+ if (preDistributedJobActivityClusterGraphMap.get(predestributedId) ==
null) {
+ throw
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB,
predestributedId);
+ }
+ preDistributedJobActivityClusterGraphMap.remove(predestributedId);
+ }
+
+ public void checkForDuplicateDistributedJob(long predestributedId) throws
HyracksException {
+ if (preDistributedJobActivityClusterGraphMap.get(predestributedId) !=
null) {
+ throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB,
predestributedId);
}
}
- public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws
HyracksException {
- return preDistributedJobActivityClusterGraphMap.get(jobId);
+ public ActivityClusterGraph getActivityClusterGraph(long predistributedId)
throws HyracksException {
+ return preDistributedJobActivityClusterGraphMap.get(predistributedId);
}
public NetworkManager getNetworkManager() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index bff2794..66b0b33 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -450,6 +450,9 @@
}
@Override
+ public byte[] getJobParameter(byte[] name, int start, int length) throws
HyracksException {
+ return
ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name,
start, length);
+ }
public Set<JobFlag> getJobFlags() {
return jobFlags;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 670ce06..03ae90c 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -51,6 +51,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Cleaning up after job: " + jobId);
}
+ ncs.removeJobParameterByteStore(jobId);
final List<IPartition> unregisteredPartitions = new
ArrayList<IPartition>();
ncs.getPartitionManager().unregisterPartitions(jobId,
unregisteredPartitions);
ncs.getExecutor().execute(new Runnable() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
index 55dd01e..77cc37e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
@@ -20,7 +20,6 @@
package org.apache.hyracks.control.nc.work;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -31,20 +30,20 @@
public class DestroyJobWork extends AbstractWork {
private final NodeControllerService ncs;
- private final JobId jobId;
+ private final long predistributedId;
- public DestroyJobWork(NodeControllerService ncs, JobId jobId) {
+ public DestroyJobWork(NodeControllerService ncs, long predistributedId) {
this.ncs = ncs;
- this.jobId = jobId;
+ this.predistributedId = predistributedId;
}
@Override
public void run() {
try {
- ncs.removeActivityClusterGraph(jobId);
+ ncs.removeActivityClusterGraph(predistributedId);
} catch (HyracksException e) {
try {
- ncs.getClusterController().notifyDistributedJobFailure(jobId,
ncs.getId());
+
ncs.getClusterController().notifyDistributedJobFailure(predistributedId,
ncs.getId());
} catch (Exception e1) {
e1.printStackTrace();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
index 486a420..e9b7f1b 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
@@ -21,7 +21,6 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -34,25 +33,24 @@
private final NodeControllerService ncs;
private final byte[] acgBytes;
- private final JobId jobId;
+ private final long predestributedId;
- public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[]
acgBytes) {
+ public DistributeJobWork(NodeControllerService ncs, long predestributedId,
byte[] acgBytes) {
this.ncs = ncs;
- this.jobId = jobId;
+ this.predestributedId = predestributedId;
this.acgBytes = acgBytes;
}
@Override
public void run() {
try {
- ncs.checkForDuplicateDistributedJob(jobId);
- ncs.updateMaxJobId(jobId);
+ ncs.checkForDuplicateDistributedJob(predestributedId);
ActivityClusterGraph acg =
(ActivityClusterGraph)
DeploymentUtils.deserialize(acgBytes, null, ncs.getContext());
- ncs.storeActivityClusterGraph(jobId, acg);
+ ncs.storeActivityClusterGraph(predestributedId, acg);
} catch (HyracksException e) {
try {
- ncs.getClusterController().notifyDistributedJobFailure(jobId,
ncs.getId());
+
ncs.getClusterController().notifyDistributedJobFailure(predestributedId,
ncs.getId());
} catch (Exception e1) {
e1.printStackTrace();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index c369781..76a146d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -79,6 +79,8 @@
private final JobId jobId;
+ private final long predistributedId;
+
private final byte[] acgBytes;
private final List<TaskAttemptDescriptor> taskDescriptors;
@@ -87,16 +89,21 @@
private final Set<JobFlag> flags;
+ private final Map<byte[], byte[]> jobParameters;
+
public StartTasksWork(NodeControllerService ncs, DeploymentId
deploymentId, JobId jobId, byte[] acgBytes,
List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap,
Set<JobFlag> flags) {
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap,
Set<JobFlag> flags,
+ Map<byte[], byte[]> jobParameters, long predistributedId) {
this.ncs = ncs;
this.deploymentId = deploymentId;
this.jobId = jobId;
+ this.predistributedId = predistributedId;
this.acgBytes = acgBytes;
this.taskDescriptors = taskDescriptors;
this.connectorPoliciesMap = connectorPoliciesMap;
this.flags = flags;
+ this.jobParameters = jobParameters;
}
@Override
@@ -106,7 +113,7 @@
try {
ncs.updateMaxJobId(jobId);
NCServiceContext serviceCtx = ncs.getContext();
- Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId,
serviceCtx, acgBytes);
+ Joblet joblet = getOrCreateLocalJoblet(deploymentId,
predistributedId, serviceCtx, acgBytes);
final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@Override
@@ -190,18 +197,19 @@
}
}
- private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId
jobId, INCServiceContext appCtx,
+ private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, long
destributedId, INCServiceContext appCtx,
byte[] acgBytes) throws HyracksException {
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
- ActivityClusterGraph acg = ncs.getActivityClusterGraph(jobId);
+ ActivityClusterGraph acg =
ncs.getActivityClusterGraph(destributedId);
if (acg == null) {
if (acgBytes == null) {
throw
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
}
acg = (ActivityClusterGraph)
DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
}
+
ncs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters);
ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
jobletMap.put(jobId, ji);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index 4a01fdb..d8ce841 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -23,6 +23,7 @@
import static org.mockito.Mockito.verify;
import java.io.File;
+import java.util.HashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -39,6 +40,7 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Matchers;
import org.mockito.Mockito;
public class PredistributedJobsTest {
@@ -108,62 +110,77 @@
JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec();
//distribute both jobs
- JobId jobId1 = hcc.distributeJob(spec1);
- JobId jobId2 = hcc.distributeJob(spec2);
+ long distributedId1 = hcc.distributeJob(spec1);
+ long distributedId2 = hcc.distributeJob(spec2);
//make sure it finished
//cc will get the store once to check for duplicate insertion and once
to insert per job
verify(cc, Mockito.timeout(5000).times(4)).getPreDistributedJobStore();
- verify(nc1,
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
- verify(nc2,
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
- verify(nc1,
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
- verify(nc2,
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
+ verify(nc1,
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(Matchers.anyLong(),
any());
+ verify(nc2,
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(Matchers.anyLong(),
any());
+ verify(nc1,
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(Matchers.anyLong());
+ verify(nc2,
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(Matchers.anyLong());
//confirm that both jobs are distributed
- Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) != null &&
nc2.getActivityClusterGraph(jobId1) != null);
- Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) != null &&
nc2.getActivityClusterGraph(jobId2) != null);
-
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId1)
!= null);
-
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2)
!= null);
+ Assert.assertTrue(nc1.getActivityClusterGraph(distributedId1) != null
&& nc2.getActivityClusterGraph(distributedId1) != null);
+ Assert.assertTrue(nc1.getActivityClusterGraph(distributedId2) != null
&& nc2.getActivityClusterGraph(distributedId2) != null);
+
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(distributedId1)
!= null);
+
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(distributedId2)
!= null);
//run the first job
- hcc.startJob(jobId1);
- hcc.waitForCompletion(jobId1);
+ JobId jobRunId1 = hcc.startJob(distributedId1, new HashMap<>());
+ hcc.waitForCompletion(jobRunId1);
+
+ //Make sure the job parameter map was removed
+ verify(cc,
Mockito.timeout(5000).times(1)).removeJobParameterByteStore(any());
+ verify(nc1,
Mockito.timeout(5000).times(1)).removeJobParameterByteStore(any());
+ verify(nc2,
Mockito.timeout(5000).times(1)).removeJobParameterByteStore(any());
//destroy the first job
- hcc.destroyJob(jobId1);
+ hcc.destroyJob(distributedId1);
//make sure it finished
verify(cc, Mockito.timeout(5000).times(8)).getPreDistributedJobStore();
- verify(nc1,
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
- verify(nc2,
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
+ verify(nc1,
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(Matchers.anyLong());
+ verify(nc2,
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(Matchers.anyLong());
//confirm the first job is destroyed
- Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) == null &&
nc2.getActivityClusterGraph(jobId1) == null);
-
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1);
+ Assert.assertTrue(nc1.getActivityClusterGraph(distributedId1) == null
&& nc2.getActivityClusterGraph(distributedId1) == null);
+
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(distributedId1);
//run the second job
- hcc.startJob(jobId2);
- hcc.waitForCompletion(jobId2);
+ JobId jobRunId2 = hcc.startJob(distributedId2, new HashMap<>());
+ hcc.waitForCompletion(jobRunId2);
+
+ //Make sure the job parameter map was removed
+ verify(cc,
Mockito.timeout(5000).times(2)).removeJobParameterByteStore(any());
+ verify(nc1,
Mockito.timeout(5000).times(2)).removeJobParameterByteStore(any());
+ verify(nc2,
Mockito.timeout(5000).times(2)).removeJobParameterByteStore(any());
//wait ten seconds to ensure the result sweeper does not break the job
//The result sweeper runs every 5 seconds during the tests
Thread.sleep(10000);
//run the second job again
- hcc.startJob(jobId2);
- hcc.waitForCompletion(jobId2);
+ JobId jobRunId3 = hcc.startJob(distributedId2, new HashMap<>());
+ hcc.waitForCompletion(jobRunId3);
+
+ //Make sure the job parameter map was removed
+ verify(cc,
Mockito.timeout(5000).times(3)).removeJobParameterByteStore(any());
+ verify(nc1,
Mockito.timeout(5000).times(3)).removeJobParameterByteStore(any());
+ verify(nc2,
Mockito.timeout(5000).times(3)).removeJobParameterByteStore(any());
//destroy the second job
- hcc.destroyJob(jobId2);
+ hcc.destroyJob(distributedId2);
//make sure it finished
verify(cc,
Mockito.timeout(5000).times(12)).getPreDistributedJobStore();
- verify(nc1,
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
- verify(nc2,
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
+ verify(nc1,
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(Matchers.anyLong());
+ verify(nc2,
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(Matchers.anyLong());
//confirm the second job is destroyed
- Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) == null &&
nc2.getActivityClusterGraph(jobId2) == null);
-
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId2);
+ Assert.assertTrue(nc1.getActivityClusterGraph(distributedId2) == null
&& nc2.getActivityClusterGraph(distributedId2) == null);
+
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(distributedId2);
}
@AfterClass
diff --git
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index b100300..e9c9cff 100644
---
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -160,6 +160,10 @@
}
@Override
+ public byte[] getJobParameter(byte[] name, int start, int length) {
+ return new byte[0];
+ }
+
public Set<JobFlag> getJobFlags() {
return EnumSet.noneOf(JobFlag.class);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2045
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>