Yingyi Bu has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1883
Change subject: [WIP] [ASTERIXDB-1982][FAIL] Fix error reporting in
SuperActivity.
......................................................................
[WIP] [ASTERIXDB-1982][FAIL] Fix error reporting in SuperActivity.
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Fix error reporting in SuperActivityOperatorNodePushable;
- Unify error reporting in result distribution and operator pipeline.
Change-Id: I36f243c98876ff40e2539ca9241ff6d19fee929f
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
D
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
11 files changed, 14 insertions(+), 129 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/83/1883/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index f79ce53..3119ddd 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -21,7 +21,6 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.JobId;
public interface IDatasetPartitionManager extends IDatasetManager {
@@ -33,8 +32,6 @@
public void reportPartitionWriteCompletion(JobId jobId, ResultSetId
resultSetId, int partition)
throws HyracksException;
-
- public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId,
int partition) throws HyracksException;
public void initializeDatasetPartitionReader(JobId jobId, ResultSetId
resultSetId, int partition, IFrameWriter noc)
throws HyracksException;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 7fdf106..5126d71 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -25,8 +25,9 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Queue;
+import java.util.Map.Entry;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
@@ -192,6 +193,7 @@
private void runInParallel(OperatorNodePushableAction action) throws
HyracksDataException {
List<Future<Void>> tasks = new ArrayList<>();
+ List<HyracksDataException> exceptions = new CopyOnWriteArrayList();
final Semaphore startSemaphore = new Semaphore(1 -
operatorNodePushablesBFSOrder.size());
final Semaphore completeSemaphore = new Semaphore(1 -
operatorNodePushablesBFSOrder.size());
try {
@@ -200,6 +202,8 @@
startSemaphore.release();
try {
action.run(op);
+ } catch (HyracksDataException e) {
+ exceptions.add(e);
} finally {
completeSemaphore.release();
}
@@ -217,6 +221,15 @@
cancelTasks(tasks, startSemaphore, completeSemaphore);
throw HyracksDataException.create(e);
}
+
+ // Reports exceptions that are encountered in executions.
+ if (!exceptions.isEmpty()) {
+ HyracksDataException exception = exceptions.get(0);
+ for (int i = 1; i < exceptions.size(); ++i) {
+ exception.addSuppressed(exceptions.get(i));
+ }
+ throw exception;
+ }
}
private void cancelTasks(List<Future<Void>> tasks, Semaphore
startSemaphore, Semaphore completeSemaphore) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 53d7620..350984c 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -36,7 +36,6 @@
import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
import org.apache.hyracks.control.cc.work.ReportProfilesWork;
-import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
import
org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
import org.apache.hyracks.control.cc.work.TaskCompleteWork;
import org.apache.hyracks.control.cc.work.TaskFailureWork;
@@ -129,12 +128,6 @@
(CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
ccs.getWorkQueue().schedule(new
ReportResultPartitionWriteCompletionWork(ccs,
rrpwc.getJobId(), rrpwc.getResultSetId(),
rrpwc.getPartition()));
- break;
- case REPORT_RESULT_PARTITION_FAILURE:
- CCNCFunctions.ReportResultPartitionFailureFunction rrpf =
- (CCNCFunctions.ReportResultPartitionFailureFunction)
fn;
- ccs.getWorkQueue().schedule(new
ReportResultPartitionFailureWork(ccs,
- rrpf.getJobId(), rrpf.getResultSetId(),
rrpf.getPartition()));
break;
case SEND_APPLICATION_MESSAGE:
CCNCFunctions.SendApplicationMessageFunction rsf =
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 3cc41c5..0f87e03 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -149,16 +149,6 @@
}
@Override
- public synchronized void reportResultPartitionFailure(JobId jobId,
ResultSetId rsId, int partition) {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
- if (djr != null) {
- djr.fail(rsId, partition);
- }
- jobResultLocations.get(jobId).setException(new Exception());
- notifyAll();
- }
-
- @Override
public synchronized void reportJobFailure(JobId jobId, List<Exception>
exceptions) {
DatasetJobRecord djr = getDatasetJobRecord(jobId);
if (djr != null) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 663a53a..68d6c16 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -41,8 +41,6 @@
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId
rsId, int partition)
throws HyracksDataException;
- public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId,
int partition);
-
public void reportJobFailure(JobId jobId, List<Exception> exceptions);
public Status getResultStatus(JobId jobId, ResultSetId rsId) throws
HyracksDataException;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
deleted file mode 100644
index 2de89db..0000000
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.work.AbstractWork;
-
-public class ReportResultPartitionFailureWork extends AbstractWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final int partition;
-
- public ReportResultPartitionFailureWork(ClusterControllerService ccs,
JobId jobId, ResultSetId rsId, int partition) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.partition = partition;
- }
-
- @Override
- public void run() {
- ccs.getDatasetDirectoryService().reportResultPartitionFailure(jobId,
rsId, partition);
- }
-
- @Override
- public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" +
partition;
- }
-}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 4159594..ec8e045 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -69,8 +69,6 @@
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId
rsId, int partition) throws Exception;
- public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId,
int partition) throws Exception;
-
public void getNodeControllerInfos() throws Exception;
public void notifyThreadDump(String nodeId, String requestId, String
threadDumpJSON) throws Exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 620033c..d42c4a8 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -83,7 +83,6 @@
REGISTER_PARTITION_REQUEST,
REGISTER_RESULT_PARTITION_LOCATION,
REPORT_RESULT_PARTITION_WRITE_COMPLETION,
- REPORT_RESULT_PARTITION_FAILURE,
NODE_REGISTRATION_RESULT,
START_TASKS,
@@ -625,39 +624,6 @@
@Override
public FunctionId getFunctionId() {
return FunctionId.REPORT_RESULT_PARTITION_WRITE_COMPLETION;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public ResultSetId getResultSetId() {
- return rsId;
- }
-
- public int getPartition() {
- return partition;
- }
- }
-
- public static class ReportResultPartitionFailureFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final int partition;
-
- public ReportResultPartitionFailureFunction(JobId jobId, ResultSetId
rsId, int partition) {
- this.jobId = jobId;
- this.rsId = rsId;
- this.partition = partition;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REPORT_RESULT_PARTITION_FAILURE;
}
public JobId getJobId() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 31cb855..4707487 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -150,13 +150,6 @@
}
@Override
- public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId,
int partition) throws Exception {
- ReportResultPartitionFailureFunction fn = new
ReportResultPartitionFailureFunction(
- jobId, rsId, partition);
- ensureIpcHandle().send(-1, fn, null);
- }
-
- @Override
public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws
Exception {
ReportDistributedJobFailureFunction fn = new
ReportDistributedJobFailureFunction(
jobId, nodeId);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index c7b563b..361ee37 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -111,17 +111,6 @@
}
@Override
- public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int
partition) throws HyracksException {
- try {
- LOGGER.info("Reporting partition failure: JobId: " + jobId + "
ResultSetId: " + rsId + " partition: "
- + partition);
- ncs.getClusterController().reportResultPartitionFailure(jobId,
rsId, partition);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- @Override
public void initializeDatasetPartitionReader(JobId jobId, ResultSetId
resultSetId, int partition,
IFrameWriter writer) throws HyracksException {
ResultState resultState = getResultState(jobId, resultSetId,
partition);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 9ca2d68..81f5551 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -103,7 +103,6 @@
resultState.closeAndDelete();
resultState.abort();
registerResultPartitionLocation(false);
- manager.reportPartitionFailure(jobId, resultSetId, partition);
} catch (HyracksException e) {
throw new HyracksDataException(e);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I36f243c98876ff40e2539ca9241ff6d19fee929f
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>