>From Michael Blow <mb...@apache.org>:

Michael Blow has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19097 )

Change subject: Merge branch 'gerrit/trinity' into 'gerrit/goldfish'
......................................................................

Merge branch 'gerrit/trinity' into 'gerrit/goldfish'

Ext-ref: MB-64229,MB-63741
Change-Id: Ia47a6630f71e75b181afe8cdddd5dcfdbbda4a05
---
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
7 files changed, 59 insertions(+), 124 deletions(-)

Approvals:
  Michael Blow: Looks good to me, approved; Verified




diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 4753cd1..9b8b103 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -40,12 +40,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.util.CleanupUtils;
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
-=======
-import org.apache.hyracks.api.util.HyracksThrowingAction;
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -63,11 +59,8 @@
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
 import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
-=======
 import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -105,19 +98,13 @@
     private final LSMTreeIndexAccessor[] lsmAccessorForUniqunessChecks;

     private final IFrameOperationCallback[] frameOpCallbacks;
-    private boolean flushedPartialTuples;
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
-=======
-    private int currentTupleIdx;
-    private int lastFlushedTupleIdx;
-    private IBatchController batchController;
-
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
     private final PermutingFrameTupleReference keyTuple;
     private final Int2ObjectMap<IntSet> partition2TuplesMap = new 
Int2ObjectOpenHashMap<>();
     private final IntSet processedTuples = new IntOpenHashSet();
     private final IntSet flushedTuples = new IntOpenHashSet();
     private final SourceLocation sourceLoc;
+    private boolean flushedPartialTuples;
+    private IBatchController batchController;

     public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int 
partition,
             IIndexDataflowHelperFactory indexHelperFactory, 
IIndexDataflowHelperFactory keyIndexHelperFactory,
@@ -162,7 +149,6 @@
         // do nothing in the master branch
     }

-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
     protected void createTupleProcessors(SourceLocation sourceLoc) {
         for (int i = 0; i < partitions.length; i++) {
             LSMTreeIndexAccessor lsmAccessorForUniqunessCheck = 
lsmAccessorForUniqunessChecks[i];
@@ -177,28 +163,6 @@
                     if (processedTuples.contains(index)) {
                         // already processed; skip
                         return;
-=======
-    protected IFrameTupleProcessor createTupleProcessor(SourceLocation 
sourceLoc) {
-        return new IFrameTupleProcessor() {
-            private HyracksThrowingAction exitAction;
-
-            @Override
-            public void process(ITupleReference tuple, int index) throws 
HyracksDataException {
-                if (index < currentTupleIdx) {
-                    // already processed; skip
-                    return;
-                }
-                keyTuple.reset(accessor, index);
-                searchPred.reset(keyTuple, keyTuple, true, true, keySearchCmp, 
keySearchCmp);
-                boolean duplicate = false;
-
-                lsmAccessorForUniqunessCheck.search(cursor, searchPred);
-                try {
-                    if (cursor.hasNext()) {
-                        // duplicate, skip
-                        searchCallback.release();
-                        duplicate = true;
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
                     }
                     switch (op) {
                         case INSERT:
@@ -271,7 +235,6 @@
         try {
             INcApplicationContext appCtx =
                     (INcApplicationContext) 
ctx.getJobletContext().getServiceContext().getApplicationContext();
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
             writer.open();
             writerOpen = true;
             for (int i = 0; i < partitions.length; i++) {
@@ -325,11 +288,7 @@
             searchPred = new RangePredicate(frameTuple, frameTuple, true, 
true, keySearchCmp, keySearchCmp, null, null);
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
             frameTuple = new FrameTupleReference();
-=======
-            LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
-                    appCtx.getTransactionSubsystem().getLogManager());
             batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, 
StandardBatchController.INSTANCE);
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
         } catch (Throwable e) { // NOSONAR: Re-thrown
             throw HyracksDataException.create(e);
         }
@@ -338,7 +297,6 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
         partition2TuplesMap.clear();
         int itemCount = accessor.getTupleCount();
         for (int i = 0; i < itemCount; i++) {
@@ -352,11 +310,9 @@
             LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) 
indexAccessors[pIdx];
             IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
             IFrameTupleProcessor processor = processors[pIdx];
-            lsmAccessor.batchOperate(accessor, tuple, processor, 
frameOpCallback, p2tuplesMapEntry.getValue());
+            lsmAccessor.batchOperate(accessor, tuple, processor, 
frameOpCallback, batchController,
+                    p2tuplesMapEntry.getValue());
         }
-=======
-        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, 
batchController);
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')

         writeBuffer.ensureFrameSize(buffer.capacity());
         if (flushedPartialTuples) {
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 417958b..dd9f4070d 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -143,15 +143,12 @@
     private final ITracer tracer;
     private final long traceCategory;
     private final ITupleProjector tupleProjector;
-    private long lastRecordInTimeStamp = 0L;
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
     private final Int2ObjectMap<IntSet> partition2TuplesMap = new 
Int2ObjectOpenHashMap<>();
     private final boolean hasSecondaries;
     private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory;
     private final int[] fieldPermutation;
-=======
+    private long lastRecordInTimeStamp = 0L;
     private IBatchController batchController;
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')

     public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int 
partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] 
fieldPermutation, RecordDescriptor inputRecDesc,
@@ -388,15 +385,9 @@
                     callback.open();
                 }
             };
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
             frameOpCallbacks[i].open();
-=======
-            frameOpCallback.open();
-            batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, 
StandardBatchController.INSTANCE);
-        } catch (Throwable e) { // NOSONAR: Re-thrown
-            throw HyracksDataException.create(e);
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
         }
+        batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, 
StandardBatchController.INSTANCE);
     }

     protected void resetSearchPredicate(int tupleIndex) {
@@ -438,7 +429,6 @@
         accessor.reset(buffer);
         partition2TuplesMap.clear();
         int itemCount = accessor.getTupleCount();
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
         for (int i = 0; i < itemCount; i++) {
             int storagePartition = tuplePartitioner.partition(accessor, i);
             if 
(tupleFilterCallbacks[storagePartitionId2Index.get(storagePartition)].filter(accessor,
 i)) {
@@ -457,11 +447,9 @@
             LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) 
indexAccessors[pIdx];
             IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
             IFrameTupleProcessor processor = processors[pIdx];
-            lsmAccessor.batchOperate(accessor, tuple, processor, 
frameOpCallback, p2tuplesMapEntry.getValue());
+            lsmAccessor.batchOperate(accessor, tuple, processor, 
frameOpCallback, batchController,
+                    p2tuplesMapEntry.getValue());
         }
-=======
-        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, 
batchController);
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
         if (itemCount > 0) {
             lastRecordInTimeStamp = System.currentTimeMillis();
         }
@@ -609,7 +597,6 @@
         // No op since nextFrame flushes by default
     }

-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
     // TODO: Refactor and remove duplicated code
     private void commitAtomicUpsert() throws HyracksDataException {
         final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
@@ -648,6 +635,5 @@
             }
         }
     }
-=======
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index b5ad705..ad5e919 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -288,31 +288,6 @@
         }
     }

-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
-    public static void tryWithCleanupsUnchecked(Runnable action, Runnable... 
cleanups) {
-        Throwable savedT = null;
-        try {
-            action.run();
-        } catch (Throwable t) {
-            savedT = t;
-        } finally {
-            for (Runnable cleanup : cleanups) {
-                try {
-                    cleanup.run();
-                } catch (Throwable t) {
-                    if (savedT != null) {
-                        savedT.addSuppressed(t);
-                    } else {
-                        savedT = t;
-                    }
-                }
-            }
-        }
-        if (savedT instanceof Error) {
-            throw (Error) savedT;
-        } else if (savedT != null) {
-            throw new UncheckedExecutionException(savedT);
-=======
     @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) 
// catching Throwable, instanceofs
     public static void tryHyracksWithCleanups(HyracksThrowingAction action, 
HyracksThrowingAction... cleanups)
             throws HyracksDataException {
@@ -346,7 +321,32 @@
             throw (Error) savedT;
         } else {
             throw HyracksDataException.create(savedT);
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
+        }
+    }
+
+    public static void tryWithCleanupsUnchecked(Runnable action, Runnable... 
cleanups) {
+        Throwable savedT = null;
+        try {
+            action.run();
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (Runnable cleanup : cleanups) {
+                try {
+                    cleanup.run();
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT instanceof Error) {
+            throw (Error) savedT;
+        } else if (savedT != null) {
+            throw new UncheckedExecutionException(savedT);
         }
     }

diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 1f77895..dbf34c9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -215,7 +215,6 @@
     /**
      * Perform batch operation on all tuples in the passed frame tuple accessor
      *
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
      * @param ctx
      *            the operation ctx
      * @param accessor
@@ -228,27 +227,17 @@
      *            the callback at the end of the frame
      * @param tuples
      *            the indexes of tuples to process
-=======
-     * @param ctx             the operation ctx
-     * @param accessor        the frame tuple accessor
-     * @param tuple           the mutable tuple used to pass the tuple to the 
processor
-     * @param processor       the tuple processor
-     * @param frameOpCallback the callback at the end of the frame
      * @param batchController
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
+     *            the controller of the batch lifecycle
      * @throws HyracksDataException
      */
     void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor 
accessor, FrameTupleReference tuple,
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
-            IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, Set<Integer> tuples)
-=======
-            IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, IBatchController batchController)
-            throws HyracksDataException;
+            IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, IBatchController batchController,
+            Set<Integer> tuples) throws HyracksDataException;

     void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws 
HyracksDataException;

     void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, 
boolean success, LSMOperationType op)
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
             throws HyracksDataException;

     /**
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index f2509ff..d019a08 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -728,23 +728,15 @@

     @Override
     public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor 
accessor, FrameTupleReference tuple,
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
-            IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, Set<Integer> tuples)
-=======
-            IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, IBatchController batchController)
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
-            throws HyracksDataException {
+            IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, IBatchController batchController,
+            Set<Integer> tuples) throws HyracksDataException {
         processor.start();
         batchController.batchEnter(ctx, this, frameOpCallback);
         boolean success = false;
         try {
             try {
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
                 processFrame(accessor, tuple, processor, tuples);
-=======
-                processFrame(accessor, tuple, processor);
                 success = true;
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
                 frameOpCallback.frameCompleted();
             } catch (Throwable th) {
                 processor.fail(th);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index c985a7b..c768768 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -212,13 +212,9 @@
     }

     public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference 
tuple, IFrameTupleProcessor processor,
-<<<<<<< HEAD   (499290 [ASTERIXDB-3503][EXT] Improve logic of distributing 
files to)
-            IFrameOperationCallback frameOpCallback, Set<Integer> tuples) 
throws HyracksDataException {
-        lsmHarness.batchOperate(ctx, accessor, tuple, processor, 
frameOpCallback, tuples);
-=======
-            IFrameOperationCallback frameOpCallback, IBatchController 
batchController) throws HyracksDataException {
-        lsmHarness.batchOperate(ctx, accessor, tuple, processor, 
frameOpCallback, batchController);
->>>>>>> BRANCH (33db60 Merge branch 'gerrit/neo' into 'gerrit/trinity')
+            IFrameOperationCallback frameOpCallback, IBatchController 
batchController, Set<Integer> tuples)
+            throws HyracksDataException {
+        lsmHarness.batchOperate(ctx, accessor, tuple, processor, 
frameOpCallback, batchController, tuples);
     }

     @Override

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19097
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: goldfish
Gerrit-Change-Id: Ia47a6630f71e75b181afe8cdddd5dcfdbbda4a05
Gerrit-Change-Number: 19097
Gerrit-PatchSet: 2
Gerrit-Owner: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-CC: Anon. E. Moose #1000171
Gerrit-MessageType: merged

Reply via email to