>From Michael Blow <[email protected]>: Michael Blow has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19072 )
Change subject: [NO ISSUE][HYR][STO,MISC] LSM enhancements, misc utilities ...................................................................... [NO ISSUE][HYR][STO,MISC] LSM enhancements, misc utilities - provide ability to intercept entering / exiting components on batch LSM operations - += HyracksThrowingAction, InvokeUtil.tryHyracksWithCleanups Ext-ref: MB-64229 Change-Id: I03f573d44b170f0c4d889920a3991592fb2890e1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19072 Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Michael Blow <[email protected]> --- A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java 12 files changed, 208 insertions(+), 43 deletions(-) Approvals: Michael Blow: Looks good to me, but someone else must approve; Verified Ali Alsuliman: Looks good to me, approved diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java index eadb614..5c87994 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.runtime.operators; +import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER; + import java.nio.ByteBuffer; import org.apache.asterix.common.api.INcApplicationContext; @@ -33,6 +35,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.util.CleanupUtils; +import org.apache.hyracks.api.util.HyracksThrowingAction; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; @@ -50,6 +53,7 @@ import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable; @@ -77,6 +81,7 @@ private boolean flushedPartialTuples; private int currentTupleIdx; private int lastFlushedTupleIdx; + private IBatchController batchController; private final PermutingFrameTupleReference keyTuple; @@ -116,6 +121,8 @@ protected IFrameTupleProcessor createTupleProcessor(SourceLocation sourceLoc) { return new IFrameTupleProcessor() { + private HyracksThrowingAction exitAction; + @Override public void process(ITupleReference tuple, int index) throws HyracksDataException { if (index < currentTupleIdx) { @@ -219,6 +226,7 @@ (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index, appCtx.getTransactionSubsystem().getLogManager()); + batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE); } catch (Throwable e) { // NOSONAR: Re-thrown throw HyracksDataException.create(e); } @@ -227,7 +235,7 @@ @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback); + lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController); writeBuffer.ensureFrameSize(buffer.capacity()); if (flushedPartialTuples) { diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index 3b8ee68..643c6ab 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.runtime.operators; +import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER; + import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; @@ -62,6 +64,7 @@ import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; @@ -117,6 +120,7 @@ private final ITracer tracer; private final long traceCategory; private long lastRecordInTimeStamp = 0L; + private IBatchController batchController; public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition, IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc, @@ -304,6 +308,7 @@ } }; frameOpCallback.open(); + batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE); } catch (Throwable e) { // NOSONAR: Re-thrown throw HyracksDataException.create(e); } @@ -344,7 +349,7 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); int itemCount = accessor.getTupleCount(); - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback); + lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController); if (itemCount > 0) { lastRecordInTimeStamp = System.currentTimeMillis(); } @@ -484,4 +489,5 @@ public void flush() throws HyracksDataException { // No op since nextFrame flushes by default } + } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java new file mode 100644 index 0000000..f9f758b --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; + +class StandardBatchController implements IBatchController { + static final IBatchController INSTANCE = new StandardBatchController(); + + private StandardBatchController() { + } + + @Override + public void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException { + lsmHarness.enter(ctx, LSMOperationType.MODIFICATION); + } + + @Override + public void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException { + lsmHarness.exit(ctx, LSMOperationType.MODIFICATION); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java index 0fc24c7..98215bf 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java @@ -63,7 +63,7 @@ * @param writer the FrameWriter to write to and flush * @throws HyracksDataException */ - public default void flush(IFrameWriter writer) throws HyracksDataException { + default void flush(IFrameWriter writer) throws HyracksDataException { write(writer, true); writer.flush(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java new file mode 100644 index 0000000..7e3d599 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.util; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +@FunctionalInterface +public interface HyracksThrowingAction { + void run() throws HyracksDataException; // NOSONAR +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index f4b6e20..d331ab2 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -285,6 +285,42 @@ } } + @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs + public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups) + throws HyracksDataException { + Throwable savedT = null; + boolean suppressedInterrupted = false; + try { + action.run(); + } catch (Throwable t) { + savedT = t; + } finally { + for (HyracksThrowingAction cleanup : cleanups) { + try { + cleanup.run(); + } catch (Throwable t) { + if (savedT != null) { + savedT.addSuppressed(t); + suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException; + } else { + savedT = t; + } + } + } + } + if (savedT == null) { + return; + } + if (suppressedInterrupted) { + Thread.currentThread().interrupt(); + } + if (savedT instanceof Error) { + throw (Error) savedT; + } else { + throw HyracksDataException.create(savedT); + } + } + /** * Runs the supplied action, after suspending any pending interruption. An error will be logged if * the action is itself interrupted. diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java index c27a7e6..6c073f3 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java @@ -172,13 +172,4 @@ ++tupleCount; IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); } - - /* - * Always write and then flush to send out the message if exists - */ - @Override - public void flush(IFrameWriter writer) throws HyracksDataException { - write(writer, true); - writer.flush(); - } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java index 75c95b0..b77883d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java @@ -74,4 +74,18 @@ Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false); return sharedMap == null ? null : (T) sharedMap.get(key); } + + /** + * get a <T> object from the shared map of the task, or returns the default value + * + * @param key + * @param ctx + * @param defaultValue + * @return the value associated with the key casted as T + */ + @SuppressWarnings("unchecked") + public static <T> T getOrDefault(String key, IHyracksTaskContext ctx, T defaultValue) { + Map<String, T> sharedMap = (Map<String, T>) TaskUtil.getSharedMap(ctx, false); + return sharedMap == null ? defaultValue : sharedMap.getOrDefault(key, defaultValue); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java new file mode 100644 index 0000000..879a8d2 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.api; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface IBatchController { + String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER"; + + void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException; + + void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException; +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 9e8c568..721d809 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -39,7 +39,6 @@ * @param tuple * the operation tuple * @throws HyracksDataException - * @throws IndexException */ void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException; @@ -54,7 +53,6 @@ * the operation tuple * @return * @throws HyracksDataException - * @throws IndexException */ boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple) throws HyracksDataException; @@ -69,7 +67,6 @@ * @param pred * the search predicate * @throws HyracksDataException - * @throws IndexException */ void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException; @@ -104,9 +101,7 @@ * Schedule a merge * * @param ctx - * @param callback * @throws HyracksDataException - * @throws IndexException */ ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx) throws HyracksDataException; @@ -114,9 +109,7 @@ * Schedule full merge * * @param ctx - * @param callback * @throws HyracksDataException - * @throws IndexException */ ILSMIOOperation scheduleFullMerge(ILSMIndexOperationContext ctx) throws HyracksDataException; @@ -125,7 +118,6 @@ * * @param operation * @throws HyracksDataException - * @throws IndexException */ void merge(ILSMIOOperation operation) throws HyracksDataException; @@ -133,7 +125,6 @@ * Schedule a flush * * @param ctx - * @param callback * @throws HyracksDataException */ ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx) throws HyracksDataException; @@ -143,7 +134,6 @@ * * @param operation * @throws HyracksDataException - * @throws IndexException */ void flush(ILSMIOOperation operation) throws HyracksDataException; @@ -153,7 +143,6 @@ * @param ioOperation * the io operation that added the new component * @throws HyracksDataException - * @throws IndexException */ void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException; @@ -225,20 +214,21 @@ /** * Perform batch operation on all tuples in the passed frame tuple accessor * - * @param ctx - * the operation ctx - * @param accessor - * the frame tuple accessor - * @param tuple - * the mutable tuple used to pass the tuple to the processor - * @param processor - * the tuple processor - * @param frameOpCallback - * the callback at the end of the frame + * @param ctx the operation ctx + * @param accessor the frame tuple accessor + * @param tuple the mutable tuple used to pass the tuple to the processor + * @param processor the tuple processor + * @param frameOpCallback the callback at the end of the frame + * @param batchController * @throws HyracksDataException */ void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple, - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException; + IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController) + throws HyracksDataException; + + void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException; + + void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException; /** * Rollback components that match the passed predicate diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 950a8e5..6e20686 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -33,6 +33,7 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -688,15 +689,17 @@ lsmIndex.updateFilter(ctx, tuple); } - private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException { + @Override + public void enter(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException { if (!lsmIndex.isMemoryComponentsAllocated()) { lsmIndex.allocateMemoryComponents(); } - getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false); + getAndEnterComponents(ctx, op, false); } - private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException { - getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION); + @Override + public void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException { + getAndExitComponentsAndComplete(ctx, op); } private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op) @@ -711,9 +714,10 @@ @Override public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple, - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException { + IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController) + throws HyracksDataException { processor.start(); - enter(ctx); + batchController.batchEnter(this, ctx); try { try { processFrame(accessor, tuple, processor); @@ -728,7 +732,7 @@ LOGGER.warn("Failed to process frame", e); throw e; } finally { - exit(ctx); + batchController.batchExit(this, ctx); ctx.logPerformanceCounters(accessor.getTupleCount()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index 8412b8c..e688727 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -28,6 +28,7 @@ import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -210,8 +211,8 @@ } public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor, - IFrameOperationCallback frameOpCallback) throws HyracksDataException { - lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback); + IFrameOperationCallback frameOpCallback, IBatchController batchController) throws HyracksDataException { + lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController); } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19072 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: neo Gerrit-Change-Id: I03f573d44b170f0c4d889920a3991592fb2890e1 Gerrit-Change-Number: 19072 Gerrit-PatchSet: 6 Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-CC: Anon. E. Moose #1000171 Gerrit-MessageType: merged
