>From Michael Blow <[email protected]>: Michael Blow has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19092 )
Change subject: [NO ISSUE][HYR][STO] Add pre-exit hook to IFrameOperationCallback ...................................................................... [NO ISSUE][HYR][STO] Add pre-exit hook to IFrameOperationCallback Ext-ref: MB-64229 Change-Id: I911884a0f4f6d66d750e452b6b8049ad67d0b00a Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19092 Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Jenkins <[email protected]> --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java 7 files changed, 63 insertions(+), 12 deletions(-) Approvals: Ali Alsuliman: Looks good to me, approved Jenkins: Verified diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java index 915f3b3..3ab86c0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java @@ -47,6 +47,11 @@ } @Override + public void beforeExit(boolean success) throws HyracksDataException { + // No Op + } + + @Override public void close() throws IOException { // No Op } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index 643c6ab..d2dc3cf 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -293,6 +293,11 @@ } @Override + public void beforeExit(boolean success) throws HyracksDataException { + callback.beforeExit(success); + } + + @Override public void close() throws IOException { callback.close(); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java index f9f758b..40465c6 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java @@ -20,6 +20,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; +import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; @@ -31,12 +32,14 @@ } @Override - public void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException { + public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback) + throws HyracksDataException { lsmHarness.enter(ctx, LSMOperationType.MODIFICATION); } @Override - public void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException { - lsmHarness.exit(ctx, LSMOperationType.MODIFICATION); + public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback, + boolean batchSuccessful) throws HyracksDataException { + lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java index 879a8d2..e061589 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java @@ -23,7 +23,9 @@ public interface IBatchController { String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER"; - void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException; + void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback) + throws HyracksDataException; - void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException; + void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback, + boolean batchSuccessful) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java index 1f89af2..2fbc0c5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java @@ -28,14 +28,24 @@ public interface IFrameOperationCallback extends Closeable { /** * Called once processing the frame is done before calling nextFrame on the next IFrameWriter in - * the pipeline + * the pipeline. In the event this frame completion will also exit the component, this will be + * called prior to {@link #beforeExit(boolean)}. * * @throws HyracksDataException */ void frameCompleted() throws HyracksDataException; /** - * Called when the task has failed. + * Called just prior to exiting the component on batch completion: not all batches may result + * in a component exit, depending on the decision of the {@link IBatchController}. + * + * @throws HyracksDataException + */ + void beforeExit(boolean success) throws HyracksDataException; + + /** + * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)} + * invocation has failed. * * @param th */ diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 721d809..68de45a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -228,7 +228,8 @@ void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException; - void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException; + void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op) + throws HyracksDataException; /** * Rollback components that match the passed predicate diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 6e20686..017e767 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -698,8 +698,18 @@ } @Override - public void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException { - getAndExitComponentsAndComplete(ctx, op); + public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, + LSMOperationType op) throws HyracksDataException { + try { + callback.beforeExit(success); + } catch (Throwable th) { + // TODO(mblow): we don't distinguish between the three distinct phases we can encounter + // failures in the callback API- we might need this eventually + callback.fail(th); + throw th; + } finally { + getAndExitComponentsAndComplete(ctx, op); + } } private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op) @@ -717,10 +727,12 @@ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController) throws HyracksDataException { processor.start(); - batchController.batchEnter(this, ctx); + batchController.batchEnter(ctx, this, frameOpCallback); + boolean success = false; try { try { processFrame(accessor, tuple, processor); + success = true; frameOpCallback.frameCompleted(); } catch (Throwable th) { processor.fail(th); @@ -732,7 +744,7 @@ LOGGER.warn("Failed to process frame", e); throw e; } finally { - batchController.batchExit(this, ctx); + batchController.batchExit(ctx, this, frameOpCallback, success); ctx.logPerformanceCounters(accessor.getTupleCount()); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19092 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: neo Gerrit-Change-Id: I911884a0f4f6d66d750e452b6b8049ad67d0b00a Gerrit-Change-Number: 19092 Gerrit-PatchSet: 6 Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail Gerrit-CC: Anon. E. Moose #1000171 Gerrit-MessageType: merged
