Steven Jacobs has submitted this change and it was merged. Change subject: ASTERIXDB-1747 Implemented full lifecycle capabilities for pre-distributed jobs ......................................................................
ASTERIXDB-1747 Implemented full lifecycle capabilities for pre-distributed jobs Added distribute and destroy functionality Removed serialization and bytes when running pred-distributed jobs Cleaned up methods Enabled Mockito testing for CCS and NCS Added Unit Test for Distributed Jobs Change-Id: I59c3422d5c1ab7756a6a4685ac527dfe50434954 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1377 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java A hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java 42 files changed, 1,002 insertions(+), 137 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found; Violations found diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java index 5ff02c7..0d1d8ab 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java @@ -35,13 +35,13 @@ private static final Logger LOGGER = Logger.getLogger(ActiveJobNotificationHandler.class.getName()); private static final boolean DEBUG = false; private final LinkedBlockingQueue<ActiveEvent> eventInbox; - private final Map<EntityId, IActiveEntityEventsListener> entityEventListener; + private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners; private final Map<JobId, ActiveJob> jobId2ActiveJobInfos; private ActiveJobNotificationHandler() { this.eventInbox = new LinkedBlockingQueue<>(); this.jobId2ActiveJobInfos = new HashMap<>(); - this.entityEventListener = new HashMap<>(); + this.entityEventListeners = new HashMap<>(); } @Override @@ -53,15 +53,14 @@ ActiveEvent event = getEventInbox().take(); ActiveJob jobInfo = jobId2ActiveJobInfos.get(event.getJobId()); EntityId entityId = jobInfo.getEntityId(); - IActiveEntityEventsListener listener = entityEventListener.get(entityId); + IActiveEntityEventsListener listener = entityEventListeners.get(entityId); if (DEBUG) { LOGGER.log(Level.WARNING, "Next event is of type " + event.getEventKind()); LOGGER.log(Level.WARNING, "Notifying the listener"); } listener.notify(event); if (event.getEventKind() == EventKind.JOB_FINISH) { - removeFinishedJob(event.getJobId()); - removeInactiveListener(listener); + removeJob(event.getJobId(), listener); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -72,11 +71,18 @@ LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName()); } - private void removeFinishedJob(JobId jobId) { - if (DEBUG) { - LOGGER.log(Level.WARNING, "Removing the job"); + public void removeJob(JobId jobId, IActiveEntityEventsListener listener) { + removeFinishedJob(jobId, listener); + removeInactiveListener(listener); + } + + private void removeFinishedJob(JobId jobId, IActiveEntityEventsListener listener) { + if (!listener.isEntityActive()) { + if (DEBUG) { + LOGGER.log(Level.WARNING, "Remove job" + jobId); + } + jobId2ActiveJobInfos.remove(jobId); } - jobId2ActiveJobInfos.remove(jobId); } private void removeInactiveListener(IActiveEntityEventsListener listener) { @@ -84,17 +90,17 @@ if (DEBUG) { LOGGER.log(Level.WARNING, "Removing the listener since it is not active anymore"); } - entityEventListener.remove(listener.getEntityId()); + entityEventListeners.remove(listener.getEntityId()); } } public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) { if (DEBUG) { LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); - IActiveEntityEventsListener listener = entityEventListener.get(entityId); + IActiveEntityEventsListener listener = entityEventListeners.get(entityId); LOGGER.log(Level.WARNING, "Listener found: " + listener); } - return entityEventListener.get(entityId); + return entityEventListeners.get(entityId); } public synchronized ActiveJob[] getActiveJobs() { @@ -141,7 +147,7 @@ ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId); if (jobInfo != null) { EntityId entityId = jobInfo.getEntityId(); - IActiveEntityEventsListener listener = entityEventListener.get(entityId); + IActiveEntityEventsListener listener = entityEventListeners.get(entityId); listener.notifyJobCreation(jobId, jobSpecification); if (DEBUG) { LOGGER.log(Level.WARNING, "Listener was notified" + jobId); @@ -161,9 +167,9 @@ public synchronized IActiveEntityEventsListener[] getEventListeners() { if (DEBUG) { LOGGER.log(Level.WARNING, "getEventListeners() was called"); - LOGGER.log(Level.WARNING, "returning " + entityEventListener.size() + " Listeners"); + LOGGER.log(Level.WARNING, "returning " + entityEventListeners.size() + " Listeners"); } - return entityEventListener.values().toArray(new IActiveEntityEventsListener[entityEventListener.size()]); + return entityEventListeners.values().toArray(new IActiveEntityEventsListener[entityEventListeners.size()]); } public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException { @@ -172,11 +178,11 @@ "registerListener(IActiveEntityEventsListener listener) was called for the entity " + listener.getEntityId()); } - if (entityEventListener.containsKey(listener.getEntityId())) { + if (entityEventListeners.containsKey(listener.getEntityId())) { throw new HyracksDataException( "Active Entity Listener " + listener.getEntityId() + " is already registered"); } - entityEventListener.put(listener.getEntityId(), listener); + entityEventListeners.put(listener.getEntityId(), listener); } public synchronized void monitorJob(JobId jobId, ActiveJob activeJob) { @@ -185,7 +191,7 @@ boolean found = jobId2ActiveJobInfos.get(jobId) != null; LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive")); } - if (entityEventListener.containsKey(activeJob.getEntityId())) { + if (entityEventListeners.containsKey(activeJob.getEntityId())) { if (jobId2ActiveJobInfos.containsKey(jobId)) { LOGGER.severe("Job is already being monitored for job: " + jobId); return; @@ -205,7 +211,7 @@ "unregisterListener(IActiveEntityEventsListener listener) was called for the entity " + listener.getEntityId()); } - IActiveEntityEventsListener registeredListener = entityEventListener.remove(listener.getEntityId()); + IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId()); if (registeredListener == null) { throw new HyracksDataException( "Active Entity Listener " + listener.getEntityId() + " hasn't been registered"); diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java index 06e9ad1..fad30fa 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java @@ -26,9 +26,9 @@ import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; public class ActiveLifecycleListener implements IJobLifecycleListener { @@ -65,8 +65,8 @@ } @Override - public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException { - ActiveJobNotificationHandler.INSTANCE.notifyJobCreation(jobId, acggf.getJobSpecification()); + public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { + ActiveJobNotificationHandler.INSTANCE.notifyJobCreation(jobId, spec); } public void receive(ActivePartitionMessage message) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 3c69d83..978c2eb 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -233,6 +233,10 @@ this.executorService = executorService; } + public SessionConfig getSessionConfig() { + return sessionConfig; + } + protected List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) { List<FunctionDecl> functionDecls = new ArrayList<>(); for (Statement st : statements) { @@ -343,7 +347,7 @@ handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false); break; case Statement.Kind.DELETE: - handleDeleteStatement(metadataProvider, stmt, hcc); + handleDeleteStatement(metadataProvider, stmt, hcc, false); break; case Statement.Kind.CREATE_PRIMARY_FEED: case Statement.Kind.CREATE_SECONDARY_FEED: @@ -1403,7 +1407,7 @@ // prepare job spec(s) that would disconnect any active feeds involving the dataset. IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); for (IActiveEntityEventsListener listener : activeListeners) { - if (listener.isEntityUsingDataset(dataverseName, datasetName)) { + if (listener.isEntityUsingDataset(dataverseName, datasetName) && listener.isEntityActive()) { throw new CompilationException( "Can't drop dataset since it is connected to active entity: " + listener.getEntityId()); } @@ -1824,7 +1828,7 @@ } } - public void handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, + public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, IStatementExecutor.Stats stats, boolean compileOnly) throws Exception { @@ -1856,7 +1860,7 @@ final JobSpecification jobSpec = rewriteCompileInsertUpsert(hcc, metadataProvider, stmtInsertUpsert); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - return compileOnly ? null : jobSpec; + return jobSpec; } catch (Exception e) { if (bActiveTxn) { abort(e, e, mdTxnCtx); @@ -1864,6 +1868,9 @@ throw e; } }; + if (compileOnly) { + return compiler.compile(); + } if (stmtInsertUpsert.getReturnExpression() != null) { deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats); @@ -1872,17 +1879,18 @@ try { final JobSpecification jobSpec = compiler.compile(); if (jobSpec == null) { - return; + return jobSpec; } JobUtils.runJob(hcc, jobSpec, true); } finally { locker.unlock(); } } + return null; } - public void handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) - throws Exception { + public JobSpecification handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt, + IHyracksClientConnection hcc, boolean compileOnly) throws Exception { DeleteStatement stmtDelete = (DeleteStatement) stmt; String dataverseName = getActiveDataverse(stmtDelete.getDataverseName()); @@ -1903,9 +1911,10 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - if (jobSpec != null) { + if (jobSpec != null && !compileOnly) { JobUtils.runJob(hcc, jobSpec, true); } + return jobSpec; } catch (Exception e) { if (bActiveTxn) { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index 55cd304..d55fde5 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -278,7 +278,7 @@ // prepare job spec(s) that would disconnect any active feeds involving the dataset. IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); for (IActiveEntityEventsListener listener : activeListeners) { - if (listener.isEntityUsingDataset(dataverseName, datasetName)) { + if (listener.isEntityActive() && listener.isEntityUsingDataset(dataverseName, datasetName)) { throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, RecordUtil.toFullyQualifiedName(dataverseName, datasetName), listener.getEntityId().toString()); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index 780e205..aa292f6 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -36,6 +36,8 @@ GET_JOB_STATUS, GET_JOB_INFO, START_JOB, + DISTRIBUTE_JOB, + DESTROY_JOB, GET_DATASET_DIRECTORY_SERIVICE_INFO, GET_DATASET_RESULT_STATUS, GET_DATASET_RESULT_LOCATIONS, @@ -101,6 +103,44 @@ } } + public static class DistributeJobFunction extends Function { + private static final long serialVersionUID = 1L; + + private final byte[] acggfBytes; + + public DistributeJobFunction(byte[] acggfBytes) { + this.acggfBytes = acggfBytes; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.DISTRIBUTE_JOB; + } + + public byte[] getACGGFBytes() { + return acggfBytes; + } + } + + public static class DestroyJobFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + public DestroyJobFunction(JobId jobId) { + this.jobId = jobId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.DESTROY_JOB; + } + + public JobId getJobId() { + return jobId; + } + } + public static class StartJobFunction extends Function { private static final long serialVersionUID = 1L; @@ -116,8 +156,8 @@ this.jobId = jobId; } - public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) { - this(null, acggfBytes, jobFlags, jobId); + public StartJobFunction(JobId jobId) { + this(null, null, null, jobId); } public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java index c049007..8e7affb 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java @@ -69,9 +69,9 @@ } @Override - public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception { + public JobId startJob(JobId jobId) throws Exception { HyracksClientInterfaceFunctions.StartJobFunction sjf = - new HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags, jobId); + new HyracksClientInterfaceFunctions.StartJobFunction(jobId); return (JobId) rpci.call(ipcHandle, sjf); } @@ -83,6 +83,20 @@ } @Override + public JobId distributeJob(byte[] acggfBytes) throws Exception { + HyracksClientInterfaceFunctions.DistributeJobFunction sjf = + new HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes); + return (JobId) rpci.call(ipcHandle, sjf); + } + + @Override + public JobId destroyJob(JobId jobId) throws Exception { + HyracksClientInterfaceFunctions.DestroyJobFunction sjf = + new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId); + return (JobId) rpci.call(ipcHandle, sjf); + } + + @Override public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception { HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf = new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction(); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index 008a640..5da1f34 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -103,19 +103,28 @@ } @Override - public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception { + public JobId distributeJob(JobSpecification jobSpec) throws Exception { JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); - return startJob(jsacggf, jobFlags, jobId); + return distributeJob(jsacggf); + } + + @Override + public JobId destroyJob(JobId jobId) throws Exception { + return hci.destroyJob(jobId); + } + + @Override + public JobId startJob(JobId jobId) throws Exception { + return hci.startJob(jobId); } public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception { return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags); } - public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags, JobId jobId) - throws Exception { - return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags, jobId); + public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf) throws Exception { + return hci.distributeJob(JavaSerializationUtils.serialize(acggf)); } public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index c4eba3d..e65cacd 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -77,17 +77,33 @@ public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception; /** - * Start the specified Job. + * Distribute the specified Job. * * @param jobSpec * Job Specification * @param jobFlags * Flags - * @param jobId - * Used to run a pre-distributed job by id (the same value will be returned) * @throws Exception */ - public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception; + public JobId distributeJob(JobSpecification jobSpec) throws Exception; + + /** + * Destroy the distributed graph for a pre-distributed job + * + * @param jobId + * The id of the predistributed job + * @throws Exception + */ + public JobId destroyJob(JobId jobId) throws Exception; + + /** + * Used to run a pre-distributed job by id (the same JobId will be returned) + * + * @param jobId + * The id of the predistributed job + * @throws Exception + */ + public JobId startJob(JobId jobId) throws Exception; /** * Start the specified Job. diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index 39063c6..f7995d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -38,7 +38,11 @@ public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception; - public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception; + public JobId startJob(JobId jobId) throws Exception; + + public JobId distributeJob(byte[] acggfBytes) throws Exception; + + public JobId destroyJob(JobId jobId) throws Exception; public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 8b24cc2..3d99cdb 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -55,6 +55,9 @@ public static final int INCONSISTENT_RESULT_METADATA = 18; public static final int CANNOT_TRUNCATE_OR_DELETE_FILE = 19; public static final int NOT_A_JOBID = 20; + public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21; + public static final int DUPLICATE_DISTRIBUTED_JOB = 22; + public static final int DISTRIBUTED_JOB_FAILURE = 23; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java index cca4a13..30ffebe 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java @@ -21,7 +21,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; public interface IJobLifecycleListener { - public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException; + public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException; public void notifyJobStart(JobId jobId) throws HyracksException; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java index 1656c51..a33c6c9 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java @@ -19,6 +19,5 @@ package org.apache.hyracks.api.job; public enum JobFlag { - PROFILE_RUNTIME, - STORE_JOB + PROFILE_RUNTIME } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 2abca66..7f90c35 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -39,5 +39,8 @@ 18 = Inconsistent metadata for result set %1$s" 19 = Can't truncate or delete the file: %1$s 20 = '%1$s' is not a valid job id. +21 = The distributed job %1$s was not found +22 = The distributed job %1$s already exists +23 = The distributed work failed for %1$s at %2$s 10000 = The given rule collection %1$s is not an instance of the List class. diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index 7ea5f70..265d3ef 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -27,10 +27,11 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobIdFactory; import org.apache.hyracks.api.job.JobInfo; -import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.control.cc.work.CliDeployBinaryWork; import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork; import org.apache.hyracks.control.cc.work.ClusterShutdownWork; +import org.apache.hyracks.control.cc.work.DestroyJobWork; +import org.apache.hyracks.control.cc.work.DistributeJobWork; import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork; import org.apache.hyracks.control.cc.work.GetJobInfoWork; import org.apache.hyracks.control.cc.work.GetJobStatusWork; @@ -81,18 +82,34 @@ ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs.getJobManager(), gjif.getJobId(), new IPCResponder<JobInfo>(handle, mid))); break; + case DISTRIBUTE_JOB: + HyracksClientInterfaceFunctions.DistributeJobFunction djf = + (HyracksClientInterfaceFunctions.DistributeJobFunction) fn; + ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory.create(), + new IPCResponder<JobId>(handle, mid))); + break; + case DESTROY_JOB: + HyracksClientInterfaceFunctions.DestroyJobFunction dsjf = + (HyracksClientInterfaceFunctions.DestroyJobFunction) fn; + ccs.getWorkQueue() + .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new IPCResponder<JobId>(handle, mid))); + break; case START_JOB: HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn; JobId jobId = sjf.getJobId(); byte[] acggfBytes = null; + boolean predistributed = false; if (jobId == null) { + //The job is new jobId = jobIdFactory.create(); + acggfBytes = sjf.getACGGFBytes(); + } else { + //The job has been predistributed. We don't need to send an ActivityClusterGraph + predistributed = true; } - //TODO: only send these when the jobId is null - acggfBytes = sjf.getACGGFBytes(); ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(), - jobId, new IPCResponder<JobId>(handle, mid))); + jobId, new IPCResponder<JobId>(handle, mid), predistributed)); break; case GET_DATASET_DIRECTORY_SERIVICE_INFO: ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs, diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java index 21fcf92..53d7620 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.control.cc.work.ApplicationMessageWork; +import org.apache.hyracks.control.cc.work.DistributedJobFailureWork; import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork; import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork; import org.apache.hyracks.control.cc.work.NodeHeartbeatWork; @@ -99,6 +100,11 @@ ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(), ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions())); break; + case DISTRIBUTED_JOB_FAILURE: + CCNCFunctions.ReportDistributedJobFailureFunction rdjf = + (CCNCFunctions.ReportDistributedJobFailureFunction) fn; + ccs.getWorkQueue().schedule(new DistributedJobFailureWork(rdjf.getJobId(), rdjf.getNodeId())); + break; case REGISTER_PARTITION_PROVIDER: CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index 37c4177..346f934 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -44,6 +44,7 @@ import org.apache.hyracks.api.context.ICCContext; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.resource.DefaultJobCapacityController; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.service.IControllerService; @@ -90,13 +91,15 @@ private final LogFile jobLog; - private final ServerContext serverCtx; + private ServerContext serverCtx; - private final WebServer webServer; + private WebServer webServer; private ClusterControllerInfo info; private CCApplicationContext appCtx; + + private final PreDistributedJobStore preDistributedJobStore = new PreDistributedJobStore(); private final WorkQueue workQueue; @@ -130,14 +133,6 @@ this.ccConfig = ccConfig; File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs"); jobLog = new LogFile(jobLogFolder); - serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot)); - IIPCI ccIPCI = new ClusterControllerIPCI(this); - clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI, - new CCNCFunctions.SerializerDeserializer()); - IIPCI ciIPCI = new ClientInterfaceIPCI(this); - clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI, - new JavaSerializationBasedPayloadSerializerDeserializer()); - webServer = new WebServer(this); // WorkQueue is in charge of heartbeat as well as other events. workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY); @@ -171,6 +166,14 @@ @Override public void start() throws Exception { LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this); + serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot)); + IIPCI ccIPCI = new ClusterControllerIPCI(this); + clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI, + new CCNCFunctions.SerializerDeserializer()); + IIPCI ciIPCI = new ClientInterfaceIPCI(this); + clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI, + new JavaSerializationBasedPayloadSerializerDeserializer()); + webServer = new WebServer(this); clusterIPC.start(); clientIPC.start(); webServer.setPort(ccConfig.httpPort); @@ -313,6 +316,10 @@ return nodeManager; } + public PreDistributedJobStore getPreDistributedJobStore() throws HyracksException { + return preDistributedJobStore; + } + public IResourceManager getResourceManager() { return resourceManager; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java new file mode 100644 index 0000000..c573ae8 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc; + +import java.util.Hashtable; +import java.util.Map; +import java.util.Set; + +import org.apache.hyracks.api.constraints.Constraint; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; + +public class PreDistributedJobStore { + + private final Map<JobId, PreDistributedJobDescriptor> preDistributedJobDescriptorMap; + + public PreDistributedJobStore() { + preDistributedJobDescriptorMap = new Hashtable<>(); + } + + public void addDistributedJobDescriptor(JobId jobId, ActivityClusterGraph activityClusterGraph, + JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) + throws HyracksException { + if (preDistributedJobDescriptorMap.get(jobId) != null) { + throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); + } + PreDistributedJobDescriptor descriptor = + new PreDistributedJobDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints); + preDistributedJobDescriptorMap.put(jobId, descriptor); + } + + public void checkForExistingDistributedJobDescriptor(JobId jobId) throws HyracksException { + if (preDistributedJobDescriptorMap.get(jobId) != null) { + throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); + } + } + + public PreDistributedJobDescriptor getDistributedJobDescriptor(JobId jobId) throws HyracksException { + PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId); + if (descriptor == null) { + throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); + } + return descriptor; + } + + public void removeDistributedJobDescriptor(JobId jobId) throws HyracksException { + PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId); + if (descriptor == null) { + throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); + } + preDistributedJobDescriptorMap.remove(jobId); + } + + public class PreDistributedJobDescriptor { + + private final ActivityClusterGraph activityClusterGraph; + + private final JobSpecification jobSpecification; + + private final Set<Constraint> activityClusterGraphConstraints; + + private PreDistributedJobDescriptor(ActivityClusterGraph activityClusterGraph, + JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) { + this.activityClusterGraph = activityClusterGraph; + this.jobSpecification = jobSpecification; + this.activityClusterGraphConstraints = activityClusterGraphConstraints; + } + + public ActivityClusterGraph getActivityClusterGraph() { + return activityClusterGraph; + } + + public JobSpecification getJobSpecification() { + return jobSpecification; + } + + public Set<Constraint> getActivityClusterGraphConstraints() { + return activityClusterGraphConstraints; + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java index e43a59d..77b9b17 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java @@ -32,9 +32,9 @@ import org.apache.hyracks.api.application.IClusterLifecycleListener; import org.apache.hyracks.api.context.ICCContext; import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.application.ApplicationContext; @@ -93,10 +93,10 @@ } } - public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) + public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { for (IJobLifecycleListener l : jobLifecycleListeners) { - l.notifyJobCreation(jobId, acggf); + l.notifyJobCreation(jobId, spec); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index 46a173e..c4cf38d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -38,8 +38,8 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.control.common.dataset.ResultStateSweeper; import org.apache.hyracks.control.common.work.IResultCallback; @@ -72,7 +72,7 @@ } @Override - public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) + public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info(getClass().getSimpleName() + " notified of new job " + jobId); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index 3eece52..8f7b0cb 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -31,8 +31,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.hyracks.control.cc.cluster.INodeManager; -import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.constraints.Constraint; import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression; @@ -54,7 +52,9 @@ import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.cc.job.ActivityClusterPlan; +import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.cc.job.Task; import org.apache.hyracks.control.cc.job.TaskAttempt; @@ -65,6 +65,7 @@ import org.apache.hyracks.control.common.job.PartitionState; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; + public class JobExecutor { private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName()); @@ -74,20 +75,28 @@ private final PartitionConstraintSolver solver; + private final boolean predistributed; + private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap; private final Set<TaskCluster> inProgressTaskClusters; private final Random random; - public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints) { + public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints, + boolean predistributed) { this.ccs = ccs; this.jobRun = jobRun; + this.predistributed = predistributed; solver = new PartitionConstraintSolver(); partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>(); inProgressTaskClusters = new HashSet<TaskCluster>(); solver.addConstraints(constraints); random = new Random(); + } + + public boolean isPredistributed() { + return predistributed; } public JobRun getJobRun() { @@ -475,7 +484,7 @@ jobRun.getConnectorPolicyMap()); INodeManager nodeManager = ccs.getNodeManager(); try { - byte[] acgBytes = JavaSerializationUtils.serialize(acg); + byte[] acgBytes = predistributed ? null : JavaSerializationUtils.serialize(acg); for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) { String nodeId = entry.getKey(); final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index e3f9557..741e3db 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -35,7 +35,6 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.ActivityClusterGraph; -import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; @@ -105,7 +104,7 @@ @Override public void add(JobRun jobRun) throws HyracksException { checkJob(jobRun); - JobSpecification job = jobRun.getActivityClusterGraphFactory().getJobSpecification(); + JobSpecification job = jobRun.getJobSpecification(); IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job); switch (status) { case QUEUE: @@ -214,7 +213,7 @@ } // Releases cluster capacitys occupied by the job. - JobSpecification job = run.getActivityClusterGraphFactory().getJobSpecification(); + JobSpecification job = run.getJobSpecification(); jobCapacityController.release(job); // Picks the next job to execute. @@ -273,8 +272,10 @@ activeRunMap.put(jobId, run); CCApplicationContext appCtx = ccs.getApplicationContext(); - IActivityClusterGraphGeneratorFactory acggf = run.getActivityClusterGraphFactory(); - appCtx.notifyJobCreation(jobId, acggf); + JobSpecification spec = run.getJobSpecification(); + if (!run.getExecutor().isPredistributed()) { + appCtx.notifyJobCreation(jobId, spec); + } run.setStatus(JobStatus.RUNNING, null); executeJobInternal(run); callback.setValue(jobId); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java index 5682194..3aa9043 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java @@ -20,12 +20,14 @@ import java.io.PrintWriter; import java.io.StringWriter; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hyracks.api.constraints.Constraint; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; import org.apache.hyracks.api.dataflow.OperatorDescriptorId; @@ -40,9 +42,11 @@ import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.PreDistributedJobStore.PreDistributedJobDescriptor; import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails; import org.apache.hyracks.control.cc.executor.JobExecutor; import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker; @@ -59,13 +63,11 @@ private final JobId jobId; - private final IActivityClusterGraphGeneratorFactory acggf; - - private final IActivityClusterGraphGenerator acgg; + private final JobSpecification spec; private final ActivityClusterGraph acg; - private final JobExecutor scheduler; + private JobExecutor scheduler; private final Set<JobFlag> jobFlags; @@ -99,17 +101,13 @@ private final IResultCallback<JobId> callback; - public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, - IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags, - IResultCallback<JobId> callback) { + private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, IResultCallback<JobId> callback, + JobSpecification spec, ActivityClusterGraph acg) { this.deploymentId = deploymentId; this.jobId = jobId; - this.acggf = acggf; - this.acgg = acgg; - this.acg = acgg.initialize(); - this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints()); this.jobFlags = jobFlags; - this.callback = callback; + this.spec = spec; + this.acg = acg; activityClusterPlanMap = new HashMap<>(); pmm = new PartitionMatchMaker(); participatingNodeIds = new HashSet<>(); @@ -118,18 +116,37 @@ connectorPolicyMap = new HashMap<>(); operatorLocations = new HashMap<>(); createTime = System.currentTimeMillis(); + this.callback = callback; + } + + //Run a Pre-distributed job by passing the JobId + public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, IResultCallback<JobId> callback, + PreDistributedJobDescriptor distributedJobDescriptor) + throws HyracksException { + this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class), callback, + distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph()); + Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints(); + this.scheduler = new JobExecutor(ccs, this, constaints, true); + } + + //Run a new job by creating an ActivityClusterGraph + public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, + IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags, + IResultCallback<JobId> callback) { + this(deploymentId, jobId, jobFlags, callback, acggf.getJobSpecification(), acgg.initialize()); + this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), false); } public DeploymentId getDeploymentId() { return deploymentId; } - public JobId getJobId() { - return jobId; + public JobSpecification getJobSpecification() { + return spec; } - public IActivityClusterGraphGeneratorFactory getActivityClusterGraphFactory() { - return acggf; + public JobId getJobId() { + return jobId; } public ActivityClusterGraph getActivityClusterGraph() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java index eac9800..6cf75bb 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java @@ -68,7 +68,7 @@ Iterator<JobRun> runIterator = jobQueue.iterator(); while (runIterator.hasNext()) { JobRun run = runIterator.next(); - JobSpecification job = run.getActivityClusterGraphFactory().getJobSpecification(); + JobSpecification job = run.getJobSpecification(); // Cluster maximum capacity can change over time, thus we have to re-check if the job should be rejected // or not. try { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java new file mode 100644 index 0000000..df98252 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc.work; + +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.cluster.INodeManager; +import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.hyracks.control.common.work.SynchronizableWork; + +public class DestroyJobWork extends SynchronizableWork { + private final ClusterControllerService ccs; + private final JobId jobId; + private final IResultCallback<JobId> callback; + + public DestroyJobWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobId> callback) { + this.jobId = jobId; + this.ccs = ccs; + this.callback = callback; + } + + @Override + protected void doRun() throws Exception { + try { + ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(jobId); + INodeManager nodeManager = ccs.getNodeManager(); + for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) { + node.getNodeController().destroyJob(jobId); + } + callback.setValue(jobId); + } catch (Exception e) { + callback.setException(e); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java new file mode 100644 index 0000000..f0c3303 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc.work; + +import java.util.EnumSet; + +import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.IActivityClusterGraphGenerator; +import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.util.JavaSerializationUtils; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.application.CCApplicationContext; +import org.apache.hyracks.control.cc.cluster.INodeManager; +import org.apache.hyracks.control.common.deployment.DeploymentUtils; +import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.hyracks.control.common.work.SynchronizableWork; + +public class DistributeJobWork extends SynchronizableWork { + private final ClusterControllerService ccs; + private final byte[] acggfBytes; + private final JobId jobId; + private final IResultCallback<JobId> callback; + + public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobId jobId, + IResultCallback<JobId> callback) { + this.jobId = jobId; + this.ccs = ccs; + this.acggfBytes = acggfBytes; + this.callback = callback; + } + + @Override + protected void doRun() throws Exception { + try { + final CCApplicationContext appCtx = ccs.getApplicationContext(); + ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId); + IActivityClusterGraphGeneratorFactory acggf = + (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, appCtx); + IActivityClusterGraphGenerator acgg = + acggf.createActivityClusterGraphGenerator(jobId, appCtx, EnumSet.noneOf(JobFlag.class)); + ActivityClusterGraph acg = acgg.initialize(); + ccs.getPreDistributedJobStore().addDistributedJobDescriptor(jobId, acg, acggf.getJobSpecification(), + acgg.getConstraints()); + + appCtx.notifyJobCreation(jobId, acggf.getJobSpecification()); + + byte[] acgBytes = JavaSerializationUtils.serialize(acg); + + INodeManager nodeManager = ccs.getNodeManager(); + for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) { + node.getNodeController().distributeJob(jobId, acgBytes); + } + + callback.setValue(jobId); + } catch (Exception e) { + callback.setException(e); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java new file mode 100644 index 0000000..f7fa2a4 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc.work; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.common.work.SynchronizableWork; + +public class DistributedJobFailureWork extends SynchronizableWork { + protected final JobId jobId; + protected final String nodeId; + + public DistributedJobFailureWork(JobId jobId, String nodeId) { + this.jobId = jobId; + this.nodeId = nodeId; + } + + @Override + public void doRun() throws HyracksException { + throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE, jobId, nodeId); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java index fefd3b6..c608712 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java @@ -40,15 +40,17 @@ private final DeploymentId deploymentId; private final JobId jobId; private final IResultCallback<JobId> callback; + private final boolean predestributed; public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes, - EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback) { + EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, boolean predestributed) { this.deploymentId = deploymentId; this.jobId = jobId; this.ccs = ccs; this.acggfBytes = acggfBytes; this.jobFlags = jobFlags; this.callback = callback; + this.predestributed = predestributed; } @Override @@ -56,11 +58,21 @@ IJobManager jobManager = ccs.getJobManager(); try { final CCApplicationContext appCtx = ccs.getApplicationContext(); - IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils - .deserialize(acggfBytes, deploymentId, appCtx); - IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags); - JobRun run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags, callback); + JobRun run; + if (!predestributed) { + //Need to create the ActivityClusterGraph + IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils + .deserialize(acggfBytes, deploymentId, appCtx); + IActivityClusterGraphGenerator acgg = + acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags); + run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags, callback); + } else { + //ActivityClusterGraph has already been distributed + run = new JobRun(ccs, deploymentId, jobId, callback, + ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId)); + } jobManager.add(run); + } catch (Exception e) { callback.setException(e); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java index 5e1b856..88b8939 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java @@ -65,7 +65,7 @@ // Mocks an immediately executable job. JobRun run = mockJobRun(id); JobSpecification job = mock(JobSpecification.class); - when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job); + when(run.getJobSpecification()).thenReturn(job); when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); // Submits the job. @@ -81,7 +81,7 @@ // Mocks a deferred job. JobRun run = mockJobRun(id); JobSpecification job = mock(JobSpecification.class); - when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job); + when(run.getJobSpecification()).thenReturn(job); when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); @@ -97,7 +97,7 @@ try { JobRun run = mockJobRun(8193); JobSpecification job = mock(JobSpecification.class); - when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job); + when(run.getJobSpecification()).thenReturn(job); when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); jobManager.add(run); @@ -138,7 +138,7 @@ try { JobRun run = mockJobRun(1); JobSpecification job = mock(JobSpecification.class); - when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job); + when(run.getJobSpecification()).thenReturn(job); when(jobCapacityController.allocate(job)) .thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0")); jobManager.add(run); @@ -162,14 +162,14 @@ // A normal run. JobRun run1 = mockJobRun(1); JobSpecification job1 = mock(JobSpecification.class); - when(run1.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job1); + when(run1.getJobSpecification()).thenReturn(job1); when(jobCapacityController.allocate(job1)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); jobManager.add(run1); // A failure run. JobRun run2 = mockJobRun(2); JobSpecification job2 = mock(JobSpecification.class); - when(run2.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job2); + when(run2.getJobSpecification()).thenReturn(job2); when(jobCapacityController.allocate(job2)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) .thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0")); jobManager.add(run2); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index a0c0f95..4159594 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -44,6 +44,8 @@ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions) throws Exception; + public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception; + public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception; public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java index 5c27a6f..a10f8f0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java @@ -50,6 +50,10 @@ public void undeployBinary(DeploymentId deploymentId) throws Exception; + public void distributeJob(JobId jobId, byte[] planBytes) throws Exception; + + public void destroyJob(JobId jobId) throws Exception; + public void dumpState(String stateDumpId) throws Exception; public void shutdown(boolean terminateNCService) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 4ee34ca..4eb1732 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -100,6 +100,10 @@ SHUTDOWN_REQUEST, SHUTDOWN_RESPONSE, + DISTRIBUTE_JOB, + DESTROY_JOB, + DISTRIBUTED_JOB_FAILURE, + STATE_DUMP_REQUEST, STATE_DUMP_RESPONSE, @@ -279,6 +283,31 @@ public List<Exception> getExceptions() { return exceptions; + } + } + + public static class ReportDistributedJobFailureFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + private final String nodeId; + + public ReportDistributedJobFailureFunction(JobId jobId, String nodeId) { + this.jobId = jobId; + this.nodeId = nodeId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.DISTRIBUTED_JOB_FAILURE; + } + + public JobId getJobId() { + return jobId; + } + + public String getNodeId() { + return nodeId; } } @@ -670,6 +699,51 @@ } } + public static class DistributeJobFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + private final byte[] acgBytes; + + public DistributeJobFunction(JobId jobId, byte[] acgBytes) { + this.jobId = jobId; + this.acgBytes = acgBytes; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.DISTRIBUTE_JOB; + } + + public JobId getJobId() { + return jobId; + } + + public byte[] getacgBytes() { + return acgBytes; + } + } + + public static class DestroyJobFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + public DestroyJobFunction(JobId jobId) { + this.jobId = jobId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.DESTROY_JOB; + } + + public JobId getJobId() { + return jobId; + } + } + public static class StartTasksFunction extends Function { private static final long serialVersionUID = 1L; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index ac6fc2c..83ef32b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -141,6 +141,13 @@ } @Override + public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception { + CCNCFunctions.ReportDistributedJobFailureFunction fn = + new CCNCFunctions.ReportDistributedJobFailureFunction(jobId, nodeId); + ipcHandle.send(-1, fn, null); + } + + @Override public void getNodeControllerInfos() throws Exception { ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index 0d59b8d..2a8464e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -84,6 +84,18 @@ } @Override + public void distributeJob(JobId jobId, byte[] planBytes) throws Exception { + CCNCFunctions.DistributeJobFunction fn = new CCNCFunctions.DistributeJobFunction(jobId, planBytes); + ipcHandle.send(-1, fn, null); + } + + @Override + public void destroyJob(JobId jobId) throws Exception { + CCNCFunctions.DestroyJobFunction fn = new CCNCFunctions.DestroyJobFunction(jobId); + ipcHandle.send(-1, fn, null); + } + + @Override public void dumpState(String stateDumpId) throws Exception { CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId); ipcHandle.send(-1, dsf, null); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index 93ccaa4..e6278be 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -26,6 +26,8 @@ import org.apache.hyracks.control.nc.work.ApplicationMessageWork; import org.apache.hyracks.control.nc.work.CleanupJobletWork; import org.apache.hyracks.control.nc.work.DeployBinaryWork; +import org.apache.hyracks.control.nc.work.DestroyJobWork; +import org.apache.hyracks.control.nc.work.DistributeJobWork; import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork; import org.apache.hyracks.control.nc.work.StartTasksWork; import org.apache.hyracks.control.nc.work.StateDumpWork; @@ -99,6 +101,16 @@ ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId())); return; + case DISTRIBUTE_JOB: + CCNCFunctions.DistributeJobFunction djf = (CCNCFunctions.DistributeJobFunction) fn; + ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, djf.getJobId(), djf.getacgBytes())); + return; + + case DESTROY_JOB: + CCNCFunctions.DestroyJobFunction dsjf = (CCNCFunctions.DestroyJobFunction) fn; + ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, dsjf.getJobId())); + return; + case STATE_DUMP_REQUEST: final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn; ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId())); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 19f01c1..bf0ddb6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -46,6 +46,8 @@ import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.JobId; @@ -94,11 +96,11 @@ private final IOManager ioManager; - private final IPCSystem ipc; + private IPCSystem ipc; - private final PartitionManager partitionManager; + private PartitionManager partitionManager; - private final NetworkManager netManager; + private NetworkManager netManager; private IDatasetPartitionManager datasetPartitionManager; @@ -155,18 +157,11 @@ public NodeControllerService(NCConfig ncConfig) throws Exception { this.ncConfig = ncConfig; id = ncConfig.nodeId; - ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), - new NodeControllerIPCI(this), - new CCNCFunctions.SerializerDeserializer()); ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.ioDevices)); if (id == null) { throw new Exception("id not set"); } - partitionManager = new PartitionManager(this); - netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, - ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort, - FullFrameChannelInterfaceFactory.INSTANCE); lccm = new LifeCycleComponentManager(); workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. @@ -244,7 +239,13 @@ @Override public void start() throws Exception { LOGGER.log(Level.INFO, "Starting NodeControllerService"); + ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), + new NodeControllerIPCI(this), new CCNCFunctions.SerializerDeserializer()); ipc.start(); + partitionManager = new PartitionManager(this); + netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, + ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort, + FullFrameChannelInterfaceFactory.INSTANCE); netManager.start(); startApplication(); @@ -365,8 +366,28 @@ return jobletMap; } - public Map<JobId, ActivityClusterGraph> getActivityClusterGraphMap() { - return activityClusterGraphMap; + public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException { + if (activityClusterGraphMap.get(jobId) != null) { + throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); + } + activityClusterGraphMap.put(jobId, acg); + } + + public void removeActivityClusterGraph(JobId jobId) throws HyracksException { + if (activityClusterGraphMap.get(jobId) == null) { + throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); + } + activityClusterGraphMap.remove(jobId); + } + + public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException { + if (activityClusterGraphMap.get(jobId) != null) { + throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); + } + } + + public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException { + return activityClusterGraphMap.get(jobId); } public NetworkManager getNetworkManager() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java new file mode 100644 index 0000000..55dd01e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.control.nc.work; + +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.nc.NodeControllerService; + +/** + * destroy a pre-distributed job + * + */ +public class DestroyJobWork extends AbstractWork { + + private final NodeControllerService ncs; + private final JobId jobId; + + public DestroyJobWork(NodeControllerService ncs, JobId jobId) { + this.ncs = ncs; + this.jobId = jobId; + } + + @Override + public void run() { + try { + ncs.removeActivityClusterGraph(jobId); + } catch (HyracksException e) { + try { + ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId()); + } catch (Exception e1) { + e1.printStackTrace(); + } + } + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java new file mode 100644 index 0000000..3a4f6ac --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.control.nc.work; + +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.common.deployment.DeploymentUtils; +import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.nc.NodeControllerService; + +/** + * pre-distribute a job that can be executed later + * + */ +public class DistributeJobWork extends AbstractWork { + + private final NodeControllerService ncs; + private final byte[] acgBytes; + private final JobId jobId; + + public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[] acgBytes) { + this.ncs = ncs; + this.jobId = jobId; + this.acgBytes = acgBytes; + } + + @Override + public void run() { + try { + ncs.checkForDuplicateDistributedJob(jobId); + ActivityClusterGraph acg = + (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getApplicationContext()); + ncs.storeActivityClusterGraph(jobId, acg); + } catch (HyracksException e) { + try { + ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId()); + } catch (Exception e1) { + e1.printStackTrace(); + } + } + + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index 803f15a..6cd9fa2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -46,6 +46,7 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.ActivityCluster; @@ -186,17 +187,12 @@ Map<JobId, Joblet> jobletMap = ncs.getJobletMap(); Joblet ji = jobletMap.get(jobId); if (ji == null) { - Map<JobId, ActivityClusterGraph> acgMap = ncs.getActivityClusterGraphMap(); - ActivityClusterGraph acg = acgMap.get(jobId); + ActivityClusterGraph acg = ncs.getActivityClusterGraph(jobId); if (acg == null) { if (acgBytes == null) { - throw new HyracksException("Joblet was not found. This job was most likely aborted."); + throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); } acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx); - if (flags.contains(JobFlag.STORE_JOB)) { - //TODO: Right now the map is append-only - acgMap.put(jobId, acg); - } } ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg); jobletMap.put(jobId, ji); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java index b51a578..a7677f8 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java @@ -63,10 +63,10 @@ public static final String NC1_ID = "nc1"; public static final String NC2_ID = "nc2"; - private static ClusterControllerService cc; + protected static ClusterControllerService cc; protected static NodeControllerService nc1; protected static NodeControllerService nc2; - private static IHyracksClientConnection hcc; + protected static IHyracksClientConnection hcc; private final List<File> outputFiles; private static AtomicInteger aInteger = new AtomicInteger(0); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java index efbb9d2..160336a 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java @@ -114,6 +114,11 @@ @Test public void optimizedSortMergeTest02() throws Exception { + JobSpecification spec = createSortMergeJobSpec(); + runTest(spec); + } + + public static JobSpecification createSortMergeJobSpec() throws Exception { JobSpecification spec = new JobSpecification(); FileSplit[] ordersSplits = new FileSplit[] { @@ -156,19 +161,17 @@ spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0); - spec.connect( - new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] { - 1, 0 }, new IBinaryHashFunctionFactory[] { - PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY), - PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 }, - new IBinaryComparatorFactory[] { - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0); + spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory( + new int[] { 1, 0 }, + new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY), + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), + new int[] { 1, 0 }, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), + PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0); spec.connect(new OneToOneConnectorDescriptor(spec), filter, 0, printer, 0); - - runTest(spec); + return spec; } } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java new file mode 100644 index 0000000..2509515 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.tests.integration; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; + +import java.io.File; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.commons.io.FileUtils; +import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +public class PredistributedJobsTest { + private static final Logger LOGGER = Logger.getLogger(PredistributedJobsTest.class.getName()); + + private static final String NC1_ID = "nc1"; + private static final String NC2_ID = "nc2"; + + private static ClusterControllerService cc; + private static NodeControllerService nc1; + private static NodeControllerService nc2; + private static IHyracksClientConnection hcc; + + @BeforeClass + public static void init() throws Exception { + CCConfig ccConfig = new CCConfig(); + ccConfig.clientNetIpAddress = "127.0.0.1"; + ccConfig.clientNetPort = 39000; + ccConfig.clusterNetIpAddress = "127.0.0.1"; + ccConfig.clusterNetPort = 39001; + ccConfig.profileDumpPeriod = 10000; + FileUtils.deleteQuietly(new File("target" + File.separator + "data")); + FileUtils.copyDirectory(new File("data"), new File("target" + File.separator + "data")); + File outDir = new File("target" + File.separator + "ClusterController"); + outDir.mkdirs(); + File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir); + ccRoot.delete(); + ccRoot.mkdir(); + ccConfig.ccRoot = ccRoot.getAbsolutePath(); + ClusterControllerService ccBase = new ClusterControllerService(ccConfig); + cc = Mockito.spy(ccBase); + cc.start(); + + NCConfig ncConfig1 = new NCConfig(); + ncConfig1.ccHost = "localhost"; + ncConfig1.ccPort = 39001; + ncConfig1.clusterNetIPAddress = "127.0.0.1"; + ncConfig1.dataIPAddress = "127.0.0.1"; + ncConfig1.resultIPAddress = "127.0.0.1"; + ncConfig1.nodeId = NC1_ID; + ncConfig1.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data" + + File.separator + "device0"; + NodeControllerService nc1Base = new NodeControllerService(ncConfig1); + nc1 = Mockito.spy(nc1Base); + nc1.start(); + + NCConfig ncConfig2 = new NCConfig(); + ncConfig2.ccHost = "localhost"; + ncConfig2.ccPort = 39001; + ncConfig2.clusterNetIPAddress = "127.0.0.1"; + ncConfig2.dataIPAddress = "127.0.0.1"; + ncConfig2.resultIPAddress = "127.0.0.1"; + ncConfig2.nodeId = NC2_ID; + ncConfig2.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data" + + File.separator + "device1"; + NodeControllerService nc2Base = new NodeControllerService(ncConfig2); + nc2 = Mockito.spy(nc2Base); + nc2.start(); + + hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath()); + } + } + + @Test + public void DistributedTest() throws Exception { + JobSpecification spec1 = UnionTest.createUnionJobSpec(); + JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec(); + + //distribute both jobs + JobId jobId1 = hcc.distributeJob(spec1); + JobId jobId2 = hcc.distributeJob(spec2); + + //make sure it finished + //cc will get the store once to check for duplicate insertion and once to insert per job + verify(cc, Mockito.timeout(5000).times(4)).getPreDistributedJobStore(); + verify(nc1, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any()); + verify(nc2, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any()); + verify(nc1, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any()); + verify(nc2, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any()); + + //confirm that both jobs are distributed + Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) != null && nc2.getActivityClusterGraph(jobId1) != null); + Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) != null && nc2.getActivityClusterGraph(jobId2) != null); + Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId1) != null); + Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2) != null); + + //run the first job + hcc.startJob(jobId1); + hcc.waitForCompletion(jobId1); + + //destroy the first job + hcc.destroyJob(jobId1); + + //make sure it finished + verify(cc, Mockito.timeout(5000).times(8)).getPreDistributedJobStore(); + verify(nc1, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any()); + verify(nc2, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any()); + + //confirm the first job is destroyed + Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) == null && nc2.getActivityClusterGraph(jobId1) == null); + cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1); + + //run the second job + hcc.startJob(jobId2); + hcc.waitForCompletion(jobId2); + + //run the second job again + hcc.startJob(jobId2); + hcc.waitForCompletion(jobId2); + + //destroy the second job + hcc.destroyJob(jobId2); + + //make sure it finished + verify(cc, Mockito.timeout(5000).times(12)).getPreDistributedJobStore(); + verify(nc1, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any()); + verify(nc2, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any()); + + //confirm the second job is destroyed + Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) == null && nc2.getActivityClusterGraph(jobId2) == null); + cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId2); + } + + @AfterClass + public static void deinit() throws Exception { + nc2.stop(); + nc1.stop(); + cc.stop(); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java index 02cab8f..542f037 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java @@ -44,6 +44,11 @@ public class UnionTest extends AbstractIntegrationTest { @Test public void union01() throws Exception { + JobSpecification spec = createUnionJobSpec(); + runTest(spec); + } + + public static JobSpecification createUnionJobSpec() throws Exception { JobSpecification spec = new JobSpecification(); IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { @@ -82,6 +87,6 @@ spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, printer, 0); spec.addRoot(printer); - runTest(spec); + return spec; } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1377 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I59c3422d5c1ab7756a6a4685ac527dfe50434954 Gerrit-PatchSet: 26 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Ildar Absalyamov <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Carey <[email protected]> Gerrit-Reviewer: Steven Jacobs <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Xikui Wang <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
