Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1883

Change subject: [WIP] [ASTERIXDB-1982][FAIL] Fix error reporting in 
SuperActivity.
......................................................................

[WIP] [ASTERIXDB-1982][FAIL] Fix error reporting in SuperActivity.

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Fix error reporting in SuperActivityOperatorNodePushable;
- Unify error reporting in result distribution and operator pipeline.

Change-Id: I36f243c98876ff40e2539ca9241ff6d19fee929f
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
D 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
11 files changed, 14 insertions(+), 129 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/83/1883/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index f79ce53..3119ddd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -21,7 +21,6 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.JobId;
 
 public interface IDatasetPartitionManager extends IDatasetManager {
@@ -33,8 +32,6 @@
 
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId 
resultSetId, int partition)
             throws HyracksException;
-
-    public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, 
int partition) throws HyracksException;
 
     public void initializeDatasetPartitionReader(JobId jobId, ResultSetId 
resultSetId, int partition, IFrameWriter noc)
             throws HyracksException;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 7fdf106..5126d71 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -25,8 +25,9 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.Map.Entry;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 
@@ -192,6 +193,7 @@
 
     private void runInParallel(OperatorNodePushableAction action) throws 
HyracksDataException {
         List<Future<Void>> tasks = new ArrayList<>();
+        List<HyracksDataException> exceptions = new CopyOnWriteArrayList();
         final Semaphore startSemaphore = new Semaphore(1 - 
operatorNodePushablesBFSOrder.size());
         final Semaphore completeSemaphore = new Semaphore(1 - 
operatorNodePushablesBFSOrder.size());
         try {
@@ -200,6 +202,8 @@
                     startSemaphore.release();
                     try {
                         action.run(op);
+                    } catch (HyracksDataException e) {
+                        exceptions.add(e);
                     } finally {
                         completeSemaphore.release();
                     }
@@ -217,6 +221,15 @@
             cancelTasks(tasks, startSemaphore, completeSemaphore);
             throw HyracksDataException.create(e);
         }
+
+        // Reports exceptions that are encountered in executions.
+        if (!exceptions.isEmpty()) {
+            HyracksDataException exception = exceptions.get(0);
+            for (int i = 1; i < exceptions.size(); ++i) {
+                exception.addSuppressed(exceptions.get(i));
+            }
+            throw exception;
+        }
     }
 
     private void cancelTasks(List<Future<Void>> tasks, Semaphore 
startSemaphore, Semaphore completeSemaphore) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 53d7620..350984c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -36,7 +36,6 @@
 import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
 import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
 import org.apache.hyracks.control.cc.work.ReportProfilesWork;
-import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
 import 
org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
 import org.apache.hyracks.control.cc.work.TaskCompleteWork;
 import org.apache.hyracks.control.cc.work.TaskFailureWork;
@@ -129,12 +128,6 @@
                         
(CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
                 ccs.getWorkQueue().schedule(new 
ReportResultPartitionWriteCompletionWork(ccs,
                         rrpwc.getJobId(), rrpwc.getResultSetId(), 
rrpwc.getPartition()));
-                break;
-            case REPORT_RESULT_PARTITION_FAILURE:
-                CCNCFunctions.ReportResultPartitionFailureFunction rrpf =
-                        (CCNCFunctions.ReportResultPartitionFailureFunction) 
fn;
-                ccs.getWorkQueue().schedule(new 
ReportResultPartitionFailureWork(ccs,
-                        rrpf.getJobId(), rrpf.getResultSetId(), 
rrpf.getPartition()));
                 break;
             case SEND_APPLICATION_MESSAGE:
                 CCNCFunctions.SendApplicationMessageFunction rsf =
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 3cc41c5..0f87e03 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -149,16 +149,6 @@
     }
 
     @Override
-    public synchronized void reportResultPartitionFailure(JobId jobId, 
ResultSetId rsId, int partition) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-        if (djr != null) {
-            djr.fail(rsId, partition);
-        }
-        jobResultLocations.get(jobId).setException(new Exception());
-        notifyAll();
-    }
-
-    @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> 
exceptions) {
         DatasetJobRecord djr = getDatasetJobRecord(jobId);
         if (djr != null) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 663a53a..68d6c16 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -41,8 +41,6 @@
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId 
rsId, int partition)
             throws HyracksDataException;
 
-    public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, 
int partition);
-
     public void reportJobFailure(JobId jobId, List<Exception> exceptions);
 
     public Status getResultStatus(JobId jobId, ResultSetId rsId) throws 
HyracksDataException;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
deleted file mode 100644
index 2de89db..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.work.AbstractWork;
-
-public class ReportResultPartitionFailureWork extends AbstractWork {
-    private final ClusterControllerService ccs;
-
-    private final JobId jobId;
-
-    private final ResultSetId rsId;
-
-    private final int partition;
-
-    public ReportResultPartitionFailureWork(ClusterControllerService ccs, 
JobId jobId, ResultSetId rsId, int partition) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-        this.rsId = rsId;
-        this.partition = partition;
-    }
-
-    @Override
-    public void run() {
-        ccs.getDatasetDirectoryService().reportResultPartitionFailure(jobId, 
rsId, partition);
-    }
-
-    @Override
-    public String toString() {
-        return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + 
partition;
-    }
-}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 4159594..ec8e045 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -69,8 +69,6 @@
 
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId 
rsId, int partition) throws Exception;
 
-    public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, 
int partition) throws Exception;
-
     public void getNodeControllerInfos() throws Exception;
 
     public void notifyThreadDump(String nodeId, String requestId, String 
threadDumpJSON) throws Exception;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 620033c..d42c4a8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -83,7 +83,6 @@
         REGISTER_PARTITION_REQUEST,
         REGISTER_RESULT_PARTITION_LOCATION,
         REPORT_RESULT_PARTITION_WRITE_COMPLETION,
-        REPORT_RESULT_PARTITION_FAILURE,
 
         NODE_REGISTRATION_RESULT,
         START_TASKS,
@@ -625,39 +624,6 @@
         @Override
         public FunctionId getFunctionId() {
             return FunctionId.REPORT_RESULT_PARTITION_WRITE_COMPLETION;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public ResultSetId getResultSetId() {
-            return rsId;
-        }
-
-        public int getPartition() {
-            return partition;
-        }
-    }
-
-    public static class ReportResultPartitionFailureFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-
-        private final ResultSetId rsId;
-
-        private final int partition;
-
-        public ReportResultPartitionFailureFunction(JobId jobId, ResultSetId 
rsId, int partition) {
-            this.jobId = jobId;
-            this.rsId = rsId;
-            this.partition = partition;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REPORT_RESULT_PARTITION_FAILURE;
         }
 
         public JobId getJobId() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 31cb855..4707487 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -150,13 +150,6 @@
     }
 
     @Override
-    public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, 
int partition) throws Exception {
-        ReportResultPartitionFailureFunction fn = new 
ReportResultPartitionFailureFunction(
-                jobId, rsId, partition);
-        ensureIpcHandle().send(-1, fn, null);
-    }
-
-    @Override
     public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws 
Exception {
         ReportDistributedJobFailureFunction fn = new 
ReportDistributedJobFailureFunction(
                 jobId, nodeId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index c7b563b..361ee37 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -111,17 +111,6 @@
     }
 
     @Override
-    public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int 
partition) throws HyracksException {
-        try {
-            LOGGER.info("Reporting partition failure: JobId: " + jobId + " 
ResultSetId: " + rsId + " partition: "
-                    + partition);
-            ncs.getClusterController().reportResultPartitionFailure(jobId, 
rsId, partition);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    @Override
     public void initializeDatasetPartitionReader(JobId jobId, ResultSetId 
resultSetId, int partition,
             IFrameWriter writer) throws HyracksException {
         ResultState resultState = getResultState(jobId, resultSetId, 
partition);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 9ca2d68..81f5551 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -103,7 +103,6 @@
             resultState.closeAndDelete();
             resultState.abort();
             registerResultPartitionLocation(false);
-            manager.reportPartitionFailure(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
         }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I36f243c98876ff40e2539ca9241ff6d19fee929f
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to