>From Michael Blow <mb...@apache.org>: Michael Blow has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19097 )
Change subject: Merge branch 'gerrit/trinity' into 'gerrit/goldfish' ...................................................................... Merge branch 'gerrit/trinity' into 'gerrit/goldfish' Ext-ref: MB-64229 Change-Id: Ia47a6630f71e75b181afe8cdddd5dcfdbbda4a05 --- M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java 7 files changed, 59 insertions(+), 124 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/97/19097/1 diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java index 4753cd1..9b8b103 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java @@ -40,12 +40,8 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.util.CleanupUtils; -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.nc.NodeControllerService; -======= -import org.apache.hyracks.api.util.HyracksThrowingAction; ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; @@ -63,11 +59,8 @@ import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils; -======= import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; @@ -105,19 +98,13 @@ private final LSMTreeIndexAccessor[] lsmAccessorForUniqunessChecks; private final IFrameOperationCallback[] frameOpCallbacks; - private boolean flushedPartialTuples; -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) -======= - private int currentTupleIdx; - private int lastFlushedTupleIdx; - private IBatchController batchController; - ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') private final PermutingFrameTupleReference keyTuple; private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>(); private final IntSet processedTuples = new IntOpenHashSet(); private final IntSet flushedTuples = new IntOpenHashSet(); private final SourceLocation sourceLoc; + private boolean flushedPartialTuples; + private IBatchController batchController; public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition, IIndexDataflowHelperFactory indexHelperFactory, IIndexDataflowHelperFactory keyIndexHelperFactory, @@ -162,7 +149,6 @@ // do nothing in the master branch } -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) protected void createTupleProcessors(SourceLocation sourceLoc) { for (int i = 0; i < partitions.length; i++) { LSMTreeIndexAccessor lsmAccessorForUniqunessCheck = lsmAccessorForUniqunessChecks[i]; @@ -177,28 +163,6 @@ if (processedTuples.contains(index)) { // already processed; skip return; -======= - protected IFrameTupleProcessor createTupleProcessor(SourceLocation sourceLoc) { - return new IFrameTupleProcessor() { - private HyracksThrowingAction exitAction; - - @Override - public void process(ITupleReference tuple, int index) throws HyracksDataException { - if (index < currentTupleIdx) { - // already processed; skip - return; - } - keyTuple.reset(accessor, index); - searchPred.reset(keyTuple, keyTuple, true, true, keySearchCmp, keySearchCmp); - boolean duplicate = false; - - lsmAccessorForUniqunessCheck.search(cursor, searchPred); - try { - if (cursor.hasNext()) { - // duplicate, skip - searchCallback.release(); - duplicate = true; ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') } switch (op) { case INSERT: @@ -271,7 +235,6 @@ try { INcApplicationContext appCtx = (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) writer.open(); writerOpen = true; for (int i = 0; i < partitions.length; i++) { @@ -325,11 +288,7 @@ searchPred = new RangePredicate(frameTuple, frameTuple, true, true, keySearchCmp, keySearchCmp, null, null); appender = new FrameTupleAppender(new VSizeFrame(ctx), true); frameTuple = new FrameTupleReference(); -======= - LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index, - appCtx.getTransactionSubsystem().getLogManager()); batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE); ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') } catch (Throwable e) { // NOSONAR: Re-thrown throw HyracksDataException.create(e); } @@ -338,7 +297,6 @@ @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) partition2TuplesMap.clear(); int itemCount = accessor.getTupleCount(); for (int i = 0; i < itemCount; i++) { @@ -352,11 +310,9 @@ LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx]; IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx]; IFrameTupleProcessor processor = processors[pIdx]; - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue()); + lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController, + p2tuplesMapEntry.getValue()); } -======= - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController); ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') writeBuffer.ensureFrameSize(buffer.capacity()); if (flushedPartialTuples) { diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index 417958b..dd9f4070d 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -143,15 +143,12 @@ private final ITracer tracer; private final long traceCategory; private final ITupleProjector tupleProjector; - private long lastRecordInTimeStamp = 0L; -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>(); private final boolean hasSecondaries; private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory; private final int[] fieldPermutation; -======= + private long lastRecordInTimeStamp = 0L; private IBatchController batchController; ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition, IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc, @@ -388,15 +385,9 @@ callback.open(); } }; -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) frameOpCallbacks[i].open(); -======= - frameOpCallback.open(); - batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE); - } catch (Throwable e) { // NOSONAR: Re-thrown - throw HyracksDataException.create(e); ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') } + batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE); } protected void resetSearchPredicate(int tupleIndex) { @@ -438,7 +429,6 @@ accessor.reset(buffer); partition2TuplesMap.clear(); int itemCount = accessor.getTupleCount(); -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) for (int i = 0; i < itemCount; i++) { int storagePartition = tuplePartitioner.partition(accessor, i); if (tupleFilterCallbacks[storagePartitionId2Index.get(storagePartition)].filter(accessor, i)) { @@ -457,11 +447,9 @@ LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx]; IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx]; IFrameTupleProcessor processor = processors[pIdx]; - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue()); + lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController, + p2tuplesMapEntry.getValue()); } -======= - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController); ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') if (itemCount > 0) { lastRecordInTimeStamp = System.currentTimeMillis(); } @@ -609,7 +597,6 @@ // No op since nextFrame flushes by default } -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) // TODO: Refactor and remove duplicated code private void commitAtomicUpsert() throws HyracksDataException { final Map<String, ILSMComponentId> componentIdMap = new HashMap<>(); @@ -648,6 +635,5 @@ } } } -======= ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') + } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index b5ad705..ad5e919 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -288,31 +288,6 @@ } } -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) - public static void tryWithCleanupsUnchecked(Runnable action, Runnable... cleanups) { - Throwable savedT = null; - try { - action.run(); - } catch (Throwable t) { - savedT = t; - } finally { - for (Runnable cleanup : cleanups) { - try { - cleanup.run(); - } catch (Throwable t) { - if (savedT != null) { - savedT.addSuppressed(t); - } else { - savedT = t; - } - } - } - } - if (savedT instanceof Error) { - throw (Error) savedT; - } else if (savedT != null) { - throw new UncheckedExecutionException(savedT); -======= @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups) throws HyracksDataException { @@ -346,7 +321,32 @@ throw (Error) savedT; } else { throw HyracksDataException.create(savedT); ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') + } + } + + public static void tryWithCleanupsUnchecked(Runnable action, Runnable... cleanups) { + Throwable savedT = null; + try { + action.run(); + } catch (Throwable t) { + savedT = t; + } finally { + for (Runnable cleanup : cleanups) { + try { + cleanup.run(); + } catch (Throwable t) { + if (savedT != null) { + savedT.addSuppressed(t); + } else { + savedT = t; + } + } + } + } + if (savedT instanceof Error) { + throw (Error) savedT; + } else if (savedT != null) { + throw new UncheckedExecutionException(savedT); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 1f77895..dbf34c9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -215,7 +215,6 @@ /** * Perform batch operation on all tuples in the passed frame tuple accessor * -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) * @param ctx * the operation ctx * @param accessor @@ -228,27 +227,17 @@ * the callback at the end of the frame * @param tuples * the indexes of tuples to process -======= - * @param ctx the operation ctx - * @param accessor the frame tuple accessor - * @param tuple the mutable tuple used to pass the tuple to the processor - * @param processor the tuple processor - * @param frameOpCallback the callback at the end of the frame * @param batchController ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') + * the controller of the batch lifecycle * @throws HyracksDataException */ void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple, -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples) -======= - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController) - throws HyracksDataException; + IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController, + Set<Integer> tuples) throws HyracksDataException; void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException; void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op) ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') throws HyracksDataException; /** diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index f2509ff..d019a08 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -728,23 +728,15 @@ @Override public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple, -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples) -======= - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController) ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') - throws HyracksDataException { + IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController, + Set<Integer> tuples) throws HyracksDataException { processor.start(); batchController.batchEnter(ctx, this, frameOpCallback); boolean success = false; try { try { -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) processFrame(accessor, tuple, processor, tuples); -======= - processFrame(accessor, tuple, processor); success = true; ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') frameOpCallback.frameCompleted(); } catch (Throwable th) { processor.fail(th); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index c985a7b..c768768 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -212,13 +212,9 @@ } public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor, -<<<<<<< HEAD (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing files to) - IFrameOperationCallback frameOpCallback, Set<Integer> tuples) throws HyracksDataException { - lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, tuples); -======= - IFrameOperationCallback frameOpCallback, IBatchController batchController) throws HyracksDataException { - lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController); ->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity') + IFrameOperationCallback frameOpCallback, IBatchController batchController, Set<Integer> tuples) + throws HyracksDataException { + lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController, tuples); } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19097 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: goldfish Gerrit-Change-Id: Ia47a6630f71e75b181afe8cdddd5dcfdbbda4a05 Gerrit-Change-Number: 19097 Gerrit-PatchSet: 1 Gerrit-Owner: Michael Blow <mb...@apache.org> Gerrit-MessageType: newchange