abdullah alamoudi has submitted this change and it was merged.
Change subject: [NO ISSUE][STO] Report batch operation failure before exiting
..
[NO ISSUE][STO] Report batch operation failure before exiting
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- In certain cases, caller of a batch operation call are
interested in failure events.
- In those cases, we used to report failure after exiting
the components but with this change, failure reporting
happens before the exit.
Change-Id: I0c22b6bddfe8f12ef8e3c59dae0b0c585137a126
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2956
Tested-by: Jenkins
Integration-Tests: Jenkins
Reviewed-by: Murtadha Hubail
---
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
5 files changed, 17 insertions(+), 18 deletions(-)
Approvals:
Anon. E. Moose #1000171:
Jenkins: Verified; Verified
Murtadha Hubail: Looks good to me, approved
Objections:
Jenkins: Violations found
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index b1a1fcc..453ffa0 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -26,7 +26,6 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -134,10 +133,5 @@
@Override
public final IFrameWriter getInputFrameWriter(int index) {
return null;
-}
-
-@Override
-public JobId getJobId() {
-return ctx.getJobletContext().getJobId();
}
}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index 2da7193..a52f01e 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -21,7 +21,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
public interface IActiveRuntime {
@@ -42,11 +41,6 @@
* @throws InterruptedException
*/
void stop(long timeout, TimeUnit unit) throws HyracksDataException,
InterruptedException;
-
-/**
- * @return the job id associated with this active runtime
- */
-JobId getJobId();
/**
* @return the runtime stats for monitoring purposes
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index ba8074f..b855981 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -206,6 +206,12 @@
public void finish() throws HyracksDataException {
lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
}
+
+@Override
+public void fail(Throwable th) {
+// We must fail before we exit the components
+frameOpCallback.fail(th);
+}
};
tracer = ctx.getJobletContext().getServiceContext().getTracer();
traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
@@ -314,12 +320,7 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int itemCount = accessor.getTupleCount();
-try {
-lsmAccessor.batchOperate(accessor, tuple, processor,
frameOpCallbac