abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2956
Change subject: [NO ISSUE][STO] Report batch operation failure before exiting
......................................................................
[NO ISSUE][STO] Report batch operation failure before exiting
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- In certain cases, caller of a batch operation call are
interested in failure events.
- In those cases, we used to report failure after exiting
the components but with this change, failure reporting
happens before the exit.
Change-Id: I0c22b6bddfe8f12ef8e3c59dae0b0c585137a126
---
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
3 files changed, 17 insertions(+), 6 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/56/2956/1
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index ba8074f..b855981 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -206,6 +206,12 @@
public void finish() throws HyracksDataException {
lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
}
+
+ @Override
+ public void fail(Throwable th) {
+ // We must fail before we exit the components
+ frameOpCallback.fail(th);
+ }
};
tracer = ctx.getJobletContext().getServiceContext().getTracer();
traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
@@ -314,12 +320,7 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int itemCount = accessor.getTupleCount();
- try {
- lsmAccessor.batchOperate(accessor, tuple, processor,
frameOpCallback);
- } catch (Throwable th) {// NOSONAR: Must notify of all failures
- frameOpCallback.fail(th);
- throw th;
- }
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
if (itemCount > 0) {
lastRecordInTimeStamp = System.currentTimeMillis();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
index 3fbe6cd..b6192c1 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
@@ -43,4 +43,11 @@
* Called once per batch before ending the batch process
*/
void finish() throws HyracksDataException;
+
+ /**
+ * Called when a failure is encountered processing a frame
+ *
+ * @param th
+ */
+ void fail(Throwable th);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index aa7be86..e9f6f20 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -700,6 +700,9 @@
try {
processFrame(accessor, tuple, processor);
frameOpCallback.frameCompleted();
+ } catch (Throwable th) {
+ processor.fail(th);
+ throw th;
} finally {
processor.finish();
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2956
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0c22b6bddfe8f12ef8e3c59dae0b0c585137a126
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Owner: abdullah alamoudi <[email protected]>