Till Westmann has submitted this change and it was merged. Change subject: [ASTERIXDB-1982][FAIL] Unify runtime error reporting ......................................................................
[ASTERIXDB-1982][FAIL] Unify runtime error reporting - user model changes: no - storage format changes: no - interface changes: no Details: - Unify error reporting in result distribution and operator pipeline; The fact that ASTERIXDB-1982 is sporadic is because the order of DatasetDirectoryService.reportJobFailure(...) and DatasetDirectoryService.reportResultPartitionFailure(...) is not deterministic and the latter can override the former; - Make the order of setException and addWaiter irrelevant to avoid sporadically slipped exception; - Fix the exception list in Task to be thread-safe. Change-Id: I36f243c98876ff40e2539ca9241ff6d19fee929f Reviewed-on: https://asterix-gerrit.ics.uci.edu/1883 Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java D hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java 12 files changed, 12 insertions(+), 132 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Till Westmann: Looks good to me, but someone else must approve Jenkins: Verified; No violations found; Verified Objections: Jenkins: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java index f79ce53..3119ddd 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java @@ -21,7 +21,6 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.io.IWorkspaceFileFactory; import org.apache.hyracks.api.job.JobId; public interface IDatasetPartitionManager extends IDatasetManager { @@ -33,8 +32,6 @@ public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException; - - public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException; public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc) throws HyracksException; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 7fdf106..def4c83 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Queue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; @@ -213,9 +214,9 @@ cancelTasks(tasks, startSemaphore, completeSemaphore); Thread.currentThread().interrupt(); throw HyracksDataException.create(e); - } catch (Exception e) { + } catch (ExecutionException e) { cancelTasks(tasks, startSemaphore, completeSemaphore); - throw HyracksDataException.create(e); + throw HyracksDataException.create(e.getCause()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java index 53d7620..350984c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java @@ -36,7 +36,6 @@ import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork; import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork; import org.apache.hyracks.control.cc.work.ReportProfilesWork; -import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork; import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork; import org.apache.hyracks.control.cc.work.TaskCompleteWork; import org.apache.hyracks.control.cc.work.TaskFailureWork; @@ -129,12 +128,6 @@ (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn; ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs, rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition())); - break; - case REPORT_RESULT_PARTITION_FAILURE: - CCNCFunctions.ReportResultPartitionFailureFunction rrpf = - (CCNCFunctions.ReportResultPartitionFailureFunction) fn; - ccs.getWorkQueue().schedule(new ReportResultPartitionFailureWork(ccs, - rrpf.getJobId(), rrpf.getResultSetId(), rrpf.getPartition())); break; case SEND_APPLICATION_MESSAGE: CCNCFunctions.SendApplicationMessageFunction rsf = diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index 3cc41c5..8401fcf 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -149,16 +149,6 @@ } @Override - public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) { - DatasetJobRecord djr = getDatasetJobRecord(jobId); - if (djr != null) { - djr.fail(rsId, partition); - } - jobResultLocations.get(jobId).setException(new Exception()); - notifyAll(); - } - - @Override public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) { DatasetJobRecord djr = getDatasetJobRecord(jobId); if (djr != null) { @@ -270,6 +260,7 @@ private DatasetJobRecord record; private Waiters waiters; + private Exception exception; JobResultInfo(DatasetJobRecord record, Waiters waiters) { this.record = record; @@ -286,6 +277,10 @@ waiters = new Waiters(); } waiters.put(rsId, new Waiter(knownRecords, callback)); + if (exception != null) { + // Exception was set before the waiter is added. + setException(exception); + } } Waiter removeWaiter(ResultSetId rsId) { @@ -302,6 +297,8 @@ waiters.remove(rsId).callback.setException(exception); } } + // Caches the exception anyway for future added waiters. + this.exception = exception; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java index 663a53a..68d6c16 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java @@ -41,8 +41,6 @@ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksDataException; - public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition); - public void reportJobFailure(JobId jobId, List<Exception> exceptions); public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java deleted file mode 100644 index 2de89db..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.cc.work; - -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.common.work.AbstractWork; - -public class ReportResultPartitionFailureWork extends AbstractWork { - private final ClusterControllerService ccs; - - private final JobId jobId; - - private final ResultSetId rsId; - - private final int partition; - - public ReportResultPartitionFailureWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, int partition) { - this.ccs = ccs; - this.jobId = jobId; - this.rsId = rsId; - this.partition = partition; - } - - @Override - public void run() { - ccs.getDatasetDirectoryService().reportResultPartitionFailure(jobId, rsId, partition); - } - - @Override - public String toString() { - return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index 4159594..ec8e045 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -69,8 +69,6 @@ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception; - public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception; - public void getNodeControllerInfos() throws Exception; public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 620033c..d42c4a8 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -83,7 +83,6 @@ REGISTER_PARTITION_REQUEST, REGISTER_RESULT_PARTITION_LOCATION, REPORT_RESULT_PARTITION_WRITE_COMPLETION, - REPORT_RESULT_PARTITION_FAILURE, NODE_REGISTRATION_RESULT, START_TASKS, @@ -625,39 +624,6 @@ @Override public FunctionId getFunctionId() { return FunctionId.REPORT_RESULT_PARTITION_WRITE_COMPLETION; - } - - public JobId getJobId() { - return jobId; - } - - public ResultSetId getResultSetId() { - return rsId; - } - - public int getPartition() { - return partition; - } - } - - public static class ReportResultPartitionFailureFunction extends Function { - private static final long serialVersionUID = 1L; - - private final JobId jobId; - - private final ResultSetId rsId; - - private final int partition; - - public ReportResultPartitionFailureFunction(JobId jobId, ResultSetId rsId, int partition) { - this.jobId = jobId; - this.rsId = rsId; - this.partition = partition; - } - - @Override - public FunctionId getFunctionId() { - return FunctionId.REPORT_RESULT_PARTITION_FAILURE; } public JobId getJobId() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 31cb855..4707487 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -150,13 +150,6 @@ } @Override - public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception { - ReportResultPartitionFailureFunction fn = new ReportResultPartitionFailureFunction( - jobId, rsId, partition); - ensureIpcHandle().send(-1, fn, null); - } - - @Override public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception { ReportDistributedJobFailureFunction fn = new ReportDistributedJobFailureFunction( jobId, nodeId); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index ad4881a..74a628d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -22,13 +22,13 @@ import java.io.Serializable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.logging.Level; @@ -121,7 +121,7 @@ opEnv = joblet.getEnvironment(); partitionSendProfile = new Hashtable<>(); pendingThreads = new LinkedHashSet<>(); - exceptions = new ArrayList<>(); + exceptions = new CopyOnWriteArrayList<>(); // Multiple threads could add exceptions to this list. this.ncs = ncs; this.inputChannelsFromConnectors = inputChannelsFromConnectors; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java index c7b563b..361ee37 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java @@ -111,17 +111,6 @@ } @Override - public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException { - try { - LOGGER.info("Reporting partition failure: JobId: " + jobId + " ResultSetId: " + rsId + " partition: " - + partition); - ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition); - } catch (Exception e) { - throw new HyracksException(e); - } - } - - @Override public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter writer) throws HyracksException { ResultState resultState = getResultState(jobId, resultSetId, partition); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java index 9ca2d68..81f5551 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java @@ -103,7 +103,6 @@ resultState.closeAndDelete(); resultState.abort(); registerResultPartitionLocation(false); - manager.reportPartitionFailure(jobId, resultSetId, partition); } catch (HyracksException e) { throw new HyracksDataException(e); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1883 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I36f243c98876ff40e2539ca9241ff6d19fee929f Gerrit-PatchSet: 9 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
