abdullah alamoudi has submitted this change and it was merged.

Change subject: [NO ISSUE][STO] Report batch operation failure before exiting
......................................................................


[NO ISSUE][STO] Report batch operation failure before exiting

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- In certain cases, caller of a batch operation call are
  interested in failure events.
- In those cases, we used to report failure after exiting
  the components but with this change, failure reporting
  happens before the exit.

Change-Id: I0c22b6bddfe8f12ef8e3c59dae0b0c585137a126
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2956
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
5 files changed, 17 insertions(+), 18 deletions(-)

Approvals:
  Anon. E. Moose #1000171: 
  Jenkins: Verified; Verified
  Murtadha Hubail: Looks good to me, approved

Objections:
  Jenkins: Violations found



diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index b1a1fcc..453ffa0 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -26,7 +26,6 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -134,10 +133,5 @@
     @Override
     public final IFrameWriter getInputFrameWriter(int index) {
         return null;
-    }
-
-    @Override
-    public JobId getJobId() {
-        return ctx.getJobletContext().getJobId();
     }
 }
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index 2da7193..a52f01e 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -21,7 +21,6 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 
 public interface IActiveRuntime {
 
@@ -42,11 +41,6 @@
      * @throws InterruptedException
      */
     void stop(long timeout, TimeUnit unit) throws HyracksDataException, 
InterruptedException;
-
-    /**
-     * @return the job id associated with this active runtime
-     */
-    JobId getJobId();
 
     /**
      * @return the runtime stats for monitoring purposes
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index ba8074f..b855981 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -206,6 +206,12 @@
             public void finish() throws HyracksDataException {
                 lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
             }
+
+            @Override
+            public void fail(Throwable th) {
+                // We must fail before we exit the components
+                frameOpCallback.fail(th);
+            }
         };
         tracer = ctx.getJobletContext().getServiceContext().getTracer();
         traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
@@ -314,12 +320,7 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int itemCount = accessor.getTupleCount();
-        try {
-            lsmAccessor.batchOperate(accessor, tuple, processor, 
frameOpCallback);
-        } catch (Throwable th) {// NOSONAR: Must notify of all failures
-            frameOpCallback.fail(th);
-            throw th;
-        }
+        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
         if (itemCount > 0) {
             lastRecordInTimeStamp = System.currentTimeMillis();
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
index 3fbe6cd..b6192c1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
@@ -43,4 +43,11 @@
      * Called once per batch before ending the batch process
      */
     void finish() throws HyracksDataException;
+
+    /**
+     * Called when a failure is encountered processing a frame
+     *
+     * @param th
+     */
+    void fail(Throwable th);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index aa7be86..e9f6f20 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -700,6 +700,9 @@
             try {
                 processFrame(accessor, tuple, processor);
                 frameOpCallback.frameCompleted();
+            } catch (Throwable th) {
+                processor.fail(th);
+                throw th;
             } finally {
                 processor.finish();
             }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2956
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I0c22b6bddfe8f12ef8e3c59dae0b0c585137a126
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to