Steven Jacobs has submitted this change and it was merged. Change subject: [ASTERIXDB-2373][HYR,CLUS] Allow upsert of JobSpecs for Deployed Jobs ......................................................................
[ASTERIXDB-2373][HYR,CLUS] Allow upsert of JobSpecs for Deployed Jobs - user model changes: none - storage format changes: none - interface changes: new methods added This change adds the upsertDeployedJobSpec method, enabling a Deployed Job to update its Job Specification. Added call in test. Removed DeployedJobService (moved methods to BAD codebase). Change-Id: I01fd5d43896d520fe75e1007d7bd39324f6f6e4b Reviewed-on: https://asterix-gerrit.ics.uci.edu/2619 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Xikui Wang <xkk...@gmail.com> --- D asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java 16 files changed, 115 insertions(+), 143 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; Xikui Wang: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java deleted file mode 100644 index bc6f1b1..0000000 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.active; - -import java.time.Instant; -import java.util.Date; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.asterix.common.transactions.ITxnIdFactory; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.job.DeployedJobSpecId; -import org.apache.hyracks.api.job.JobId; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * Provides functionality for running DeployedJobSpecs - */ -public class DeployedJobService { - - private static final Logger LOGGER = LogManager.getLogger(); - - //To enable new Asterix TxnId for separate deployed job spec invocations - private static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes(); - - //pool size one (only running one thread at a time) - private static final int POOL_SIZE = 1; - - //Starts running a deployed job specification periodically with an interval of "duration" seconds - public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId, - IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> jobParameters, EntityId entityId, - ITxnIdFactory txnIdFactory) { - ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE); - scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - if (!runRepetitiveDeployedJobSpec(distributedId, hcc, jobParameters, duration, entityId, - txnIdFactory)) { - scheduledExecutorService.shutdown(); - } - } catch (Exception e) { - LOGGER.log(Level.ERROR, "Job Failed to run for " + entityId.getExtensionName() + " " - + entityId.getDataverse() + "." + entityId.getEntityName() + ".", e); - } - } - }, duration, duration, TimeUnit.MILLISECONDS); - return scheduledExecutorService; - } - - public static boolean runRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc, - Map<byte[], byte[]> jobParameters, long duration, EntityId entityId, ITxnIdFactory txnIdFactory) - throws Exception { - long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, jobParameters, entityId, txnIdFactory); - if (executionMilliseconds > duration && LOGGER.isErrorEnabled()) { - LOGGER.log(Level.ERROR, - "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "." - + entityId.getEntityName() + " was unable to meet the required period of " + duration - + " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown" - + new Date()); - return false; - } - return true; - } - - public synchronized static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc, - Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory) throws Exception { - JobId jobId; - long startTime = Instant.now().toEpochMilli(); - - //Add the Asterix Transaction Id to the map - jobParameters.put(TRANSACTION_ID_PARAMETER_NAME, String.valueOf(txnIdFactory.create().getId()).getBytes()); - jobId = hcc.startJob(distributedId, jobParameters); - - hcc.waitForCompletion(jobId); - long executionMilliseconds = Instant.now().toEpochMilli() - startTime; - - LOGGER.log(Level.INFO, - "Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse() - + "." + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds "); - - return executionMilliseconds; - - } - - @Override - public String toString() { - return "DeployedJobSpecService"; - } - -} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index 23c41fe..7182f42 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -39,8 +39,9 @@ GET_JOB_STATUS, GET_JOB_INFO, START_JOB, - DISTRIBUTE_JOB, - DESTROY_JOB, + DEPLOY_JOB, + UNDEPLOY_JOB, + UPSERT_DEPLOYED_JOB, CANCEL_JOB, GET_DATASET_DIRECTORY_SERIVICE_INFO, GET_DATASET_RESULT_STATUS, @@ -107,6 +108,32 @@ } } + public static class UpsertDeployedJobSpecFunction extends Function { + private static final long serialVersionUID = 1L; + + private final byte[] acggfBytes; + + private final DeployedJobSpecId deployedJobSpecId; + + public UpsertDeployedJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) { + this.deployedJobSpecId = deployedJobSpecId; + this.acggfBytes = acggfBytes; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.UPSERT_DEPLOYED_JOB; + } + + public byte[] getACGGFBytes() { + return acggfBytes; + } + + public DeployedJobSpecId getDeployedJobSpecId() { + return deployedJobSpecId; + } + } + public static class DeployJobSpecFunction extends Function { private static final long serialVersionUID = 1L; @@ -118,7 +145,7 @@ @Override public FunctionId getFunctionId() { - return FunctionId.DISTRIBUTE_JOB; + return FunctionId.DEPLOY_JOB; } public byte[] getACGGFBytes() { @@ -159,7 +186,7 @@ @Override public FunctionId getFunctionId() { - return FunctionId.DESTROY_JOB; + return FunctionId.UNDEPLOY_JOB; } public DeployedJobSpecId getDeployedJobSpecId() { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java index eddcaa5..07ca6b0 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java @@ -98,6 +98,14 @@ } @Override + public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) + throws Exception { + HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction udjsf = + new HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction(deployedJobSpecId, acggfBytes); + return (DeployedJobSpecId) rpci.call(ipcHandle, udjsf); + } + + @Override public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception { HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf = new HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index f676d27..5b98778 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -111,6 +111,14 @@ } @Override + public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec) + throws Exception { + JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = + new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); + return hci.upsertDeployedJobSpec(deployedJobSpecId, JavaSerializationUtils.serialize(jsacggf)); + } + + @Override public DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception { JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index 510a6b6..61d1418 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -99,6 +99,18 @@ DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception; /** + * Update the JobSpec for a deployed job. + * + * @param deployedJobSpecId + * The id of the deployed job spec + * @param jobSpec + * Job Specification + * @throws Exception + */ + DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec) + throws Exception; + + /** * Remove the deployed Job Spec * * @param deployedJobSpecId diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index f0c7872..2b92bcd 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -45,6 +45,9 @@ public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception; + public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) + throws Exception; + public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception; public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index e46aa7f..f123c8a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -85,13 +85,19 @@ ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs.getJobManager(), gjif.getJobId(), new IPCResponder<JobInfo>(handle, mid))); break; - case DISTRIBUTE_JOB: + case DEPLOY_JOB: HyracksClientInterfaceFunctions.DeployJobSpecFunction djf = (HyracksClientInterfaceFunctions.DeployJobSpecFunction) fn; ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, djf.getACGGFBytes(), - deployedJobSpecIdFactory.create(), new IPCResponder<>(handle, mid))); + deployedJobSpecIdFactory.create(), false, new IPCResponder<>(handle, mid))); break; - case DESTROY_JOB: + case UPSERT_DEPLOYED_JOB: + HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction udjsf = + (HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction) fn; + ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, udjsf.getACGGFBytes(), + udjsf.getDeployedJobSpecId(), true, new IPCResponder<>(handle, mid))); + break; + case UNDEPLOY_JOB: HyracksClientInterfaceFunctions.UndeployJobSpecFunction dsjf = (HyracksClientInterfaceFunctions.UndeployJobSpecFunction) fn; ccs.getWorkQueue().schedule( diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java index 0e22c25..041e224 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java @@ -40,9 +40,6 @@ public void addDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph activityClusterGraph, JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) throws HyracksException { - if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) { - throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId); - } DeployedJobSpecDescriptor descriptor = new DeployedJobSpecDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints); deployedJobSpecDescriptorMap.put(deployedJobSpecId, descriptor); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java index c51f3c5..60c88c7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java @@ -39,20 +39,24 @@ private final byte[] acggfBytes; private final DeployedJobSpecId deployedJobSpecId; private final IResultCallback<DeployedJobSpecId> callback; + private final boolean upsert; public DeployJobSpecWork(ClusterControllerService ccs, byte[] acggfBytes, DeployedJobSpecId deployedJobSpecId, - IResultCallback<DeployedJobSpecId> callback) { + boolean upsert, IResultCallback<DeployedJobSpecId> callback) { this.deployedJobSpecId = deployedJobSpecId; this.ccs = ccs; this.acggfBytes = acggfBytes; this.callback = callback; + this.upsert = upsert; } @Override protected void doRun() throws Exception { try { final CCServiceContext ccServiceCtx = ccs.getContext(); - ccs.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(deployedJobSpecId); + if (!upsert) { + ccs.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(deployedJobSpecId); + } IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx); IActivityClusterGraphGenerator acgg = @@ -65,7 +69,7 @@ INodeManager nodeManager = ccs.getNodeManager(); for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) { - node.getNodeController().deployJobSpec(deployedJobSpecId, acgBytes); + node.getNodeController().deployJobSpec(deployedJobSpecId, acgBytes, upsert); } callback.setValue(deployedJobSpecId); } catch (Exception e) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java index 78cd44d..fa835f4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java @@ -54,7 +54,8 @@ void undeployBinary(DeploymentId deploymentId) throws Exception; - void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception; + void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes, boolean checkForDuplicate) + throws Exception; void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 8e02936..dea5198 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -103,8 +103,8 @@ SHUTDOWN_REQUEST, SHUTDOWN_RESPONSE, - DISTRIBUTE_JOB, - DESTROY_JOB, + DEPLOY_JOB, + UNDEPLOY_JOB, DEPLOYED_JOB_FAILURE, STATE_DUMP_REQUEST, @@ -713,15 +713,18 @@ private final byte[] acgBytes; - public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, CcId ccId) { + private final boolean upsert; + + public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, boolean upsert, CcId ccId) { super(ccId); this.deployedJobSpecId = deployedJobSpecId; this.acgBytes = acgBytes; + this.upsert = upsert; } @Override public FunctionId getFunctionId() { - return FunctionId.DISTRIBUTE_JOB; + return FunctionId.DEPLOY_JOB; } public DeployedJobSpecId getDeployedJobSpecId() { @@ -730,6 +733,10 @@ public byte[] getacgBytes() { return acgBytes; + } + + public boolean getUpsert() { + return upsert; } } @@ -745,7 +752,7 @@ @Override public FunctionId getFunctionId() { - return FunctionId.DESTROY_JOB; + return FunctionId.UNDEPLOY_JOB; } public DeployedJobSpecId getDeployedJobSpecId() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index d6867eb..8242bdc 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -103,8 +103,8 @@ } @Override - public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception { - DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes, ccId); + public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes, boolean upsert) throws Exception { + DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes, upsert, ccId); ipcHandle.send(-1, fn, null); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index 735f7cf..08cd5d8 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -107,13 +107,13 @@ ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId(), ndbf.getCcId())); return; - case DISTRIBUTE_JOB: + case DEPLOY_JOB: CCNCFunctions.DeployJobSpecFunction djf = (CCNCFunctions.DeployJobSpecFunction) fn; - ncs.getWorkQueue().schedule( - new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes(), djf.getCcId())); + ncs.getWorkQueue().schedule(new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes(), + djf.getUpsert(), djf.getCcId())); return; - case DESTROY_JOB: + case UNDEPLOY_JOB: CCNCFunctions.UndeployJobSpecFunction dsjf = (CCNCFunctions.UndeployJobSpecFunction) fn; ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId(), dsjf.getCcId())); return; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 6a7d645..aa2320a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -470,7 +470,6 @@ } private ConcurrentHashMap<CcId, Serializable> getDistributedState() { - //noinspection unchecked return (ConcurrentHashMap<CcId, Serializable>) serviceCtx.getDistributedState(); } @@ -566,9 +565,6 @@ public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph acg) throws HyracksException { - if (deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) != null) { - throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId); - } deployedJobSpecActivityClusterGraphMap.put(deployedJobSpecId.getId(), acg); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java index 92612dd..bcdb97f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java @@ -28,7 +28,7 @@ import org.apache.hyracks.control.nc.NodeControllerService; /** - * pre-distribute a job that can be executed later + * Deploy a job that can be executed later * */ public class DeployJobSpecWork extends AbstractWork { @@ -37,19 +37,23 @@ private final byte[] acgBytes; private final CcId ccId; private final DeployedJobSpecId deployedJobSpecId; + private final boolean upsert; public DeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, - CcId ccId) { + boolean upsert, CcId ccId) { this.ncs = ncs; this.deployedJobSpecId = deployedJobSpecId; this.acgBytes = acgBytes; this.ccId = ccId; + this.upsert = upsert; } @Override public void run() { try { - ncs.checkForDuplicateDeployedJobSpec(deployedJobSpecId); + if (!upsert) { + ncs.checkForDuplicateDeployedJobSpec(deployedJobSpecId); + } ActivityClusterGraph acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getContext()); ncs.storeActivityClusterGraph(deployedJobSpecId, acg); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java index 40b6b27..834fab5 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java @@ -199,6 +199,16 @@ for (int i = 0; i < 100; i++) { hcc.startJob(distributedId2, new HashMap<>()); } + + //Change the second job into the first job and see whether it runs + hcc.upsertDeployedJobSpec(distributedId2, spec1); + JobId jobRunId4 = hcc.startJob(distributedId2, new HashMap<>()); + hcc.waitForCompletion(jobRunId4); + + //Run it one more time + JobId jobRunId5 = hcc.startJob(distributedId2, new HashMap<>()); + hcc.waitForCompletion(jobRunId5); + } @AfterClass -- To view, visit https://asterix-gerrit.ics.uci.edu/2619 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I01fd5d43896d520fe75e1007d7bd39324f6f6e4b Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Steven Jacobs <sjaco...@ucr.edu> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Xikui Wang <xkk...@gmail.com>