Michael Blow has submitted this change and it was merged. Change subject: [NO ISSUE][HYR] Ensure IJobLifecycleListener is notified on cancelled queued jobs ......................................................................
[NO ISSUE][HYR] Ensure IJobLifecycleListener is notified on cancelled queued jobs Change-Id: I7e26c9d1015725f895876f5873eccd3f86b17653 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2624 Sonar-Qube: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java 3 files changed, 64 insertions(+), 50 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; ; Verified Michael Blow: Looks good to me, but someone else must approve Murtadha Hubail: Looks good to me, approved diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java index 7aa84e2..210779e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java @@ -40,6 +40,17 @@ return new HyracksException(cause); } + public static HyracksException wrapOrThrowUnchecked(Throwable cause) { + if (cause instanceof Error) { + throw (Error) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof HyracksException) { + return (HyracksException) cause; + } + return new HyracksException(cause); + } + public static HyracksException create(int code, Serializable... params) { return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 9fd1a02..3ba25f5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -31,12 +31,14 @@ import java.util.Set; import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; import org.apache.hyracks.control.cc.application.CCServiceContext; @@ -65,8 +67,7 @@ private final IJobCapacityController jobCapacityController; private IJobQueue jobQueue; - public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) - throws HyracksException { + public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) { this.ccs = ccs; this.jobCapacityController = jobCapacityController; try { @@ -116,6 +117,8 @@ case EXECUTE: executeJob(jobRun); break; + default: + throw new IllegalStateException("unknown submission status: " + status); } } @@ -136,9 +139,18 @@ List<Exception> exceptions = Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); // Since the job has not been executed, we only need to update its status and lifecyle here. - jobRun.setStatus(JobStatus.FAILURE, exceptions); + jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions); runMapArchive.put(jobId, jobRun); runMapHistory.put(jobId, exceptions); + CCServiceContext serviceCtx = ccs.getContext(); + if (serviceCtx != null) { + try { + serviceCtx.notifyJobFinish(jobId, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions); + } catch (Exception e) { + LOGGER.error("Exception notifying cancel on pending job {}", jobId, e); + throw HyracksDataException.create(e); + } + } } callback.setValue(null); } @@ -152,14 +164,12 @@ finalComplete(run); return; } - JobId jobId = run.getJobId(); - HyracksException caughtException = null; if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) { finalComplete(run); return; } if (run.getPendingStatus() != null) { - LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: " + jobId); + LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", run::getJobId); return; } Set<String> targetNodes = run.getParticipatingNodeIds(); @@ -168,38 +178,40 @@ run.setPendingStatus(status, exceptions); } - if (targetNodes != null && !targetNodes.isEmpty()) { - INodeManager nodeManager = ccs.getNodeManager(); - Set<String> toDelete = new HashSet<>(); - for (String n : targetNodes) { - NodeControllerState ncs = nodeManager.getNodeControllerState(n); - try { - if (ncs == null) { - toDelete.add(n); - } else { - ncs.getNodeController().cleanUpJoblet(jobId, status); - } - } catch (Exception e) { - LOGGER.log(Level.ERROR, e.getMessage(), e); - if (caughtException == null) { - caughtException = HyracksException.create(e); - } else { - caughtException.addSuppressed(e); - } - } - } - targetNodes.removeAll(toDelete); - run.getCleanupPendingNodeIds().removeAll(toDelete); - if (run.getCleanupPendingNodeIds().isEmpty()) { - finalComplete(run); - } + if (!targetNodes.isEmpty()) { + cleanupJobOnNodes(run, status, targetNodes); } else { finalComplete(run); } + } + + private void cleanupJobOnNodes(JobRun run, JobStatus status, Set<String> targetNodes) throws HyracksException { + Throwable caughtException = null; + JobId jobId = run.getJobId(); + INodeManager nodeManager = ccs.getNodeManager(); + Set<String> toDelete = new HashSet<>(); + for (String n : targetNodes) { + NodeControllerState ncs = nodeManager.getNodeControllerState(n); + if (ncs == null) { + toDelete.add(n); + } else { + try { + ncs.getNodeController().cleanUpJoblet(jobId, status); + } catch (Exception e) { + LOGGER.error("Exception cleaning up joblet {} on node {}", jobId, n, e); + caughtException = ExceptionUtils.suppress(caughtException, e); + } + } + } + targetNodes.removeAll(toDelete); + run.getCleanupPendingNodeIds().removeAll(toDelete); + if (run.getCleanupPendingNodeIds().isEmpty()) { + finalComplete(run); + } // throws caught exceptions if any if (caughtException != null) { - throw caughtException; + throw HyracksException.wrapOrThrowUnchecked(caughtException); } } @@ -207,13 +219,13 @@ public void finalComplete(JobRun run) throws HyracksException { checkJob(run); JobId jobId = run.getJobId(); - HyracksException caughtException = null; + Throwable caughtException = null; CCServiceContext serviceCtx = ccs.getContext(); if (serviceCtx != null) { try { serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions()); - } catch (HyracksException e) { - LOGGER.log(Level.ERROR, e.getMessage(), e); + } catch (Exception e) { + LOGGER.error("Exception notifying job finish {}", jobId, e); caughtException = e; } } @@ -224,18 +236,14 @@ runMapHistory.put(jobId, run.getExceptions()); if (run.getActivityClusterGraph().isReportTaskDetails()) { - /** + /* * log job details when profiling is enabled */ try { ccs.getJobLogFile().log(createJobLogObject(run)); } catch (Exception e) { - LOGGER.log(Level.ERROR, e.getMessage(), e); - if (caughtException == null) { - caughtException = HyracksException.create(e); - } else { - caughtException.addSuppressed(e); - } + LOGGER.error("Exception reporting task details for job {}", jobId, e); + caughtException = ExceptionUtils.suppress(caughtException, e); } } @@ -248,7 +256,7 @@ // throws caught exceptions if any if (caughtException != null) { - throw caughtException; + throw HyracksException.wrapOrThrowUnchecked(caughtException); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java index e4699c7..5b98260 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java @@ -209,17 +209,12 @@ } public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) { - Map<Integer, String> locations = operatorLocations.get(op); - if (locations == null) { - locations = new HashMap<Integer, String>(); - operatorLocations.put(op, locations); - } - locations.put(partition, location); + operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location); } @Override public synchronized void waitForCompletion() throws Exception { - while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) { + while (status == JobStatus.PENDING || status == JobStatus.RUNNING) { wait(); } if (exceptions != null && !exceptions.isEmpty()) { -- To view, visit https://asterix-gerrit.ics.uci.edu/2624 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7e26c9d1015725f895876f5873eccd3f86b17653 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: release-0.9.4-pre-rc Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
