Taewoo Kim has submitted this change and it was merged.

Change subject: Index-Only Plan Patch Set 4: Index SearchCursor adjustment
......................................................................


Index-Only Plan Patch Set 4: Index SearchCursor adjustment

 - Modify the search cursors to maintain the result of proceed()
   since the result will be required to check whether the given tuple
   is qualified to be part of an index-only plan result or not.
   More details will be followed in the next patch set.
 - Fix the search cursors to call cancel() correctly.

Change-Id: I299b1858b7875ffc116f8f3115d319fe7b53a537
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1630
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyin...@gmail.com>
---
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
6 files changed, 175 insertions(+), 103 deletions(-)

Approvals:
  Yingyi Bu: Looks good to me, approved
  Jenkins: Verified; No violations found; Verified

Objections:
  Jenkins: Violations found



diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index 890823c..c319b64 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -60,9 +60,18 @@
         }
     }
 
+    /**
+     * Cancels the reconcile() operation. Since reconcile() gets a lock, this 
lock
+     * needs to be unlocked to reverse the effect of reconcile().
+     */
     @Override
     public void cancel(ITupleReference tuple) throws HyracksDataException {
-        //no op
+        int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+        try {
+            lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
+        } catch (ACIDException e) {
+            throw new HyracksDataException(e);
+        }
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index 77c3573..99c3e3f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -73,11 +73,14 @@
             btreeAccessors[i].search(rangeCursors[i], predicate);
             if (rangeCursors[i].hasNext()) {
                 rangeCursors[i].next();
-                // We use the predicate's to lock the key instead of the tuple 
that we get from cursor to avoid copying the tuple when we do the "unlatch 
dance"
+                // We use the predicate's to lock the key instead of the tuple 
that we get from cursor
+                // to avoid copying the tuple when we do the "unlatch dance".
                 if (reconciled || 
searchCallback.proceed(predicate.getLowKey())) {
                     // if proceed is successful, then there's no need for 
doing the "unlatch dance"
                     if (((ILSMTreeTupleReference) 
rangeCursors[i].getTuple()).isAntimatter()) {
-                        searchCallback.cancel(predicate.getLowKey());
+                        if (reconciled) {
+                            searchCallback.cancel(predicate.getLowKey());
+                        }
                         rangeCursors[i].close();
                         return false;
                     } else {
@@ -94,7 +97,6 @@
 
                     // retraverse
                     btreeAccessors[0].search(rangeCursors[i], predicate);
-                    searchCallback.complete(predicate.getLowKey());
                     if (rangeCursors[i].hasNext()) {
                         rangeCursors[i].next();
                         if (((ILSMTreeTupleReference) 
rangeCursors[i].getTuple()).isAntimatter()) {
@@ -104,9 +106,11 @@
                         } else {
                             frameTuple = rangeCursors[i].getTuple();
                             foundTuple = true;
+                            searchCallback.complete(predicate.getLowKey());
                             return true;
                         }
                     } else {
+                        searchCallback.cancel(predicate.getLowKey());
                         rangeCursors[i].close();
                     }
                 } else {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 1b8c151..3199cb6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -48,7 +48,8 @@
     private RangePredicate predicate;
     private BTreeAccessor[] btreeAccessors;
     private ArrayTupleBuilder tupleBuilder;
-    private boolean proceed = true;
+    private boolean canCallProceed = true;
+    private boolean resultOfSearchCallBackProceed = false;
 
     public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
         this(opCtx, false);
@@ -63,84 +64,115 @@
     @Override
     public void reset() throws HyracksDataException {
         super.reset();
-        proceed = true;
+        canCallProceed = true;
     }
 
     @Override
     public void next() throws HyracksDataException {
         outputElement = outputPriorityQueue.poll();
-        needPush = true;
-        proceed = false;
+        needPushElementIntoQueue = true;
+        canCallProceed = false;
     }
 
+    /**
+     * Checks the priority queue and resets and the top element if required.
+     * PriorityQueue can hold one element from each cursor.
+     * The boolean variable canCallProceedMethod controls whether we can call 
proceed() method for this element.
+     * i.e. it can return this element if proceed() succeeds.
+     * If proceed fails, that is most-likely that there is ongoing operations 
in the in-memory component.
+     * After resolving in-memory component issue, it progresses again.
+     * Also, in order to not release the same element again, it keeps the 
previous output and checks it
+     * against the current head in the queue.
+     */
     @Override
     protected void checkPriorityQueue() throws HyracksDataException {
-        while (!outputPriorityQueue.isEmpty() || needPush == true) {
+        while (!outputPriorityQueue.isEmpty() || needPushElementIntoQueue == 
true) {
             if (!outputPriorityQueue.isEmpty()) {
                 PriorityQueueElement checkElement = outputPriorityQueue.peek();
-                if (proceed && 
!searchCallback.proceed(checkElement.getTuple())) {
-                    if (includeMutableComponent) {
-                        PriorityQueueElement mutableElement = null;
-                        boolean mutableElementFound = false;
-                        // scan the PQ for the mutable component's element
-                        Iterator<PriorityQueueElement> it = 
outputPriorityQueue.iterator();
-                        while (it.hasNext()) {
-                            mutableElement = it.next();
-                            if (mutableElement.getCursorIndex() == 0) {
-                                mutableElementFound = true;
-                                it.remove();
-                                break;
-                            }
-                        }
-                        if (mutableElementFound) {
-                            // copy the in-mem tuple
-                            if (tupleBuilder == null) {
-                                tupleBuilder = new 
ArrayTupleBuilder(cmp.getKeyFieldCount());
-                            }
-                            TupleUtils.copyTuple(tupleBuilder, 
mutableElement.getTuple(), cmp.getKeyFieldCount());
-                            copyTuple.reset(tupleBuilder.getFieldEndOffsets(), 
tupleBuilder.getByteArray());
-
-                            // unlatch/unpin
-                            rangeCursors[0].reset();
-
-                            // reconcile
-                            if (checkElement.getCursorIndex() == 0) {
-                                searchCallback.reconcile(copyTuple);
-                            } else {
-                                
searchCallback.reconcile(checkElement.getTuple());
-                                
searchCallback.complete(checkElement.getTuple());
-                            }
-                            // retraverse
-                            reusablePred.setLowKey(copyTuple, true);
-                            btreeAccessors[0].search(rangeCursors[0], 
reusablePred);
-                            boolean isNotExhaustedCursor = 
pushIntoPriorityQueue(mutableElement);
-
-                            if (checkElement.getCursorIndex() == 0) {
-                                if (!isNotExhaustedCursor || 
cmp.compare(copyTuple, mutableElement.getTuple()) != 0) {
-                                    searchCallback.complete(copyTuple);
-                                    searchCallback.cancel(copyTuple);
-                                    continue;
+                if (canCallProceed) {
+                    resultOfSearchCallBackProceed = 
searchCallback.proceed(checkElement.getTuple());
+                    if (!resultOfSearchCallBackProceed) {
+                        // In case proceed() fails and there is an in-memory 
component,
+                        // we can't simply use this element since there might 
be a change.
+                        if (includeMutableComponent) {
+                            PriorityQueueElement mutableElement = null;
+                            boolean mutableElementFound = false;
+                            // Scans the PQ for the mutable component's 
element and delete it
+                            // since it can be changed.
+                            // (i.e. we can't ensure that the element is the 
most current one.)
+                            Iterator<PriorityQueueElement> it = 
outputPriorityQueue.iterator();
+                            while (it.hasNext()) {
+                                mutableElement = it.next();
+                                if (mutableElement.getCursorIndex() == 0) {
+                                    mutableElementFound = true;
+                                    it.remove();
+                                    break;
                                 }
-                                searchCallback.complete(copyTuple);
+                            }
+                            if (mutableElementFound) {
+                                // Copies the in-memory tuple.
+                                if (tupleBuilder == null) {
+                                    tupleBuilder = new 
ArrayTupleBuilder(cmp.getKeyFieldCount());
+                                }
+                                TupleUtils.copyTuple(tupleBuilder, 
mutableElement.getTuple(), cmp.getKeyFieldCount());
+                                
copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+
+                                // Unlatches/unpins the leaf page of the index.
+                                rangeCursors[0].reset();
+
+                                // Tries to reconcile.
+                                if (checkElement.getCursorIndex() == 0) {
+                                    searchCallback.reconcile(copyTuple);
+                                } else {
+                                    // If this element is from the disk 
component, we can call complete()
+                                    // after reconcile() since we can 
guarantee that there is no change.
+                                    
searchCallback.reconcile(checkElement.getTuple());
+                                    
searchCallback.complete(checkElement.getTuple());
+                                }
+                                // Re-traverses the index.
+                                reusablePred.setLowKey(copyTuple, true);
+                                btreeAccessors[0].search(rangeCursors[0], 
reusablePred);
+                                boolean isNotExhaustedCursor =
+                                        
pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
+
+                                if (checkElement.getCursorIndex() == 0) {
+                                    if (!isNotExhaustedCursor
+                                            || cmp.compare(copyTuple, 
mutableElement.getTuple()) != 0) {
+                                        // The searched key no longer exists. 
We call cancel() to
+                                        // reverse the effect of reconcile() 
method.
+                                        searchCallback.cancel(copyTuple);
+                                        continue;
+                                    }
+                                    // The searched key is still there.
+                                    // TODO: do we need to call or not call 
complete() in this case?
+                                    searchCallback.complete(copyTuple);
+                                }
+                            } else {
+                                // The mutable cursor is exhausted and it 
couldn't find the element.
+                                // The failed element did not come from the 
in-memory component.
+                                
searchCallback.reconcile(checkElement.getTuple());
                             }
                         } else {
-                            // the mutable cursor is exhausted
+                            // proceed() failed. However, there is no 
in-memory component.
+                            // So just call reconcile.
                             searchCallback.reconcile(checkElement.getTuple());
                         }
-                    } else {
-                        searchCallback.reconcile(checkElement.getTuple());
                     }
                 }
-                // If there is no previous tuple or the previous tuple can be 
ignored
+
+                // If there is no previous tuple or the previous tuple can be 
ignored.
+                // This check is needed not to release the same tuple again.
                 if (outputElement == null) {
                     if (isDeleted(checkElement) && !returnDeletedTuples) {
                         // If the key has been deleted then pop it and set 
needPush to true.
                         // We cannot push immediately because the tuple may be
                         // modified if hasNext() is called
                         outputElement = outputPriorityQueue.poll();
-                        searchCallback.cancel(checkElement.getTuple());
-                        needPush = true;
-                        proceed = false;
+                        if (!resultOfSearchCallBackProceed) {
+                            searchCallback.cancel(checkElement.getTuple());
+                        }
+                        needPushElementIntoQueue = true;
+                        canCallProceed = false;
                     } else {
                         break;
                     }
@@ -154,24 +186,24 @@
 
                         // the head element of PQ is useless now
                         PriorityQueueElement e = outputPriorityQueue.poll();
-                        pushIntoPriorityQueue(e);
+                        pushIntoQueueFromCursorAndReplaceThisElement(e);
                     } else {
                         // If the previous tuple and the head tuple are 
different
                         // the info of previous tuple is useless
-                        if (needPush == true) {
-                            pushIntoPriorityQueue(outputElement);
-                            needPush = false;
+                        if (needPushElementIntoQueue == true) {
+                            
pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
+                            needPushElementIntoQueue = false;
                         }
-                        proceed = true;
+                        canCallProceed = true;
                         outputElement = null;
                     }
                 }
             } else {
                 // the priority queue is empty and needPush
-                pushIntoPriorityQueue(outputElement);
-                needPush = false;
+                pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
+                needPushElementIntoQueue = false;
                 outputElement = null;
-                proceed = true;
+                canCallProceed = true;
             }
         }
     }
@@ -224,6 +256,6 @@
         }
         setPriorityQueueComparator();
         initPriorityQueue();
-        proceed = true;
+        canCallProceed = true;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 4c3a577..b8d21e4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -44,7 +44,7 @@
     protected PriorityQueue<PriorityQueueElement> outputPriorityQueue;
     protected PriorityQueueComparator pqCmp;
     protected MultiComparator cmp;
-    protected boolean needPush;
+    protected boolean needPushElementIntoQueue;
     protected boolean includeMutableComponent;
     protected ILSMHarness lsmHarness;
 
@@ -54,7 +54,7 @@
         this.opCtx = opCtx;
         this.returnDeletedTuples = returnDeletedTuples;
         outputElement = null;
-        needPush = false;
+        needPushElementIntoQueue = false;
     }
 
     public ILSMIndexOperationContext getOpCtx() {
@@ -70,7 +70,7 @@
                 pqes[i] = new PriorityQueueElement(i);
             }
             for (int i = 0; i < rangeCursors.length; i++) {
-                pushIntoPriorityQueue(pqes[i]);
+                pushIntoQueueFromCursorAndReplaceThisElement(pqes[i]);
             }
         } else {
             outputPriorityQueue.clear();
@@ -79,14 +79,14 @@
                 // size is the same -> re-use
                 for (int i = 0; i < rangeCursors.length; i++) {
                     pqes[i].reset(null);
-                    pushIntoPriorityQueue(pqes[i]);
+                    pushIntoQueueFromCursorAndReplaceThisElement(pqes[i]);
                 }
             } else {
                 // size changed (due to flushes, merges, etc) -> re-create
                 pqes = new PriorityQueueElement[pqInitSize];
                 for (int i = 0; i < rangeCursors.length; i++) {
                     pqes[i] = new PriorityQueueElement(i);
-                    pushIntoPriorityQueue(pqes[i]);
+                    pushIntoQueueFromCursorAndReplaceThisElement(pqes[i]);
                 }
             }
         }
@@ -99,7 +99,7 @@
     @Override
     public void reset() throws HyracksDataException {
         outputElement = null;
-        needPush = false;
+        needPushElementIntoQueue = false;
 
         try {
             if (outputPriorityQueue != null) {
@@ -128,7 +128,7 @@
     @Override
     public void next() throws HyracksDataException {
         outputElement = outputPriorityQueue.poll();
-        needPush = true;
+        needPushElementIntoQueue = true;
     }
 
     @Override
@@ -169,7 +169,7 @@
         return outputElement.getTuple();
     }
 
-    protected boolean pushIntoPriorityQueue(PriorityQueueElement e) throws 
HyracksDataException {
+    protected boolean 
pushIntoQueueFromCursorAndReplaceThisElement(PriorityQueueElement e) throws 
HyracksDataException {
         int cursorIndex = e.getCursorIndex();
         if (rangeCursors[cursorIndex].hasNext()) {
             rangeCursors[cursorIndex].next();
@@ -186,7 +186,7 @@
     }
 
     protected void checkPriorityQueue() throws HyracksDataException {
-        while (!outputPriorityQueue.isEmpty() || (needPush == true)) {
+        while (!outputPriorityQueue.isEmpty() || (needPushElementIntoQueue == 
true)) {
             if (!outputPriorityQueue.isEmpty()) {
                 PriorityQueueElement checkElement = outputPriorityQueue.peek();
                 // If there is no previous tuple or the previous tuple can be 
ignored
@@ -196,7 +196,7 @@
                         // We cannot push immediately because the tuple may be
                         // modified if hasNext() is called
                         outputElement = outputPriorityQueue.poll();
-                        needPush = true;
+                        needPushElementIntoQueue = true;
                     } else {
                         break;
                     }
@@ -210,21 +210,21 @@
 
                         // the head element of PQ is useless now
                         PriorityQueueElement e = outputPriorityQueue.poll();
-                        pushIntoPriorityQueue(e);
+                        pushIntoQueueFromCursorAndReplaceThisElement(e);
                     } else {
                         // If the previous tuple and the head tuple are 
different
                         // the info of previous tuple is useless
-                        if (needPush == true) {
-                            pushIntoPriorityQueue(outputElement);
-                            needPush = false;
+                        if (needPushElementIntoQueue == true) {
+                            
pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
+                            needPushElementIntoQueue = false;
                         }
                         outputElement = null;
                     }
                 }
             } else {
                 // the priority queue is empty and needPush
-                pushIntoPriorityQueue(outputElement);
-                needPush = false;
+                pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
+                needPushElementIntoQueue = false;
                 outputElement = null;
             }
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
index 607f957..9352fbb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
@@ -58,6 +58,8 @@
     private ILSMIndexOperationContext opCtx;
 
     private List<ILSMComponent> operationalComponents;
+    private ITupleReference currentTuple = null;
+    private boolean resultOfSearchCallBackProceed = false;
 
     @Override
     public void open(ICursorInitialState initialState, ISearchPredicate 
searchPred) throws HyracksDataException {
@@ -111,9 +113,21 @@
     private boolean nextValidTuple() throws HyracksDataException {
         while (currentCursor.hasNext()) {
             currentCursor.next();
-            if (!isDeleted(currentCursor.getTuple())) {
+            currentTuple = currentCursor.getTuple();
+            resultOfSearchCallBackProceed = 
searchCallback.proceed(currentTuple);
+
+            if (!resultOfSearchCallBackProceed) {
+                // We assume that the underlying cursors materialize their 
results such that
+                // there is no need to reposition the result cursor after 
reconciliation.
+                searchCallback.reconcile(currentTuple);
+            }
+
+            if (!isDeleted(currentTuple)) {
                 tupleConsumed = false;
                 return true;
+            } else if (!resultOfSearchCallBackProceed) {
+                // reconcile & tuple deleted case: needs to cancel the effect 
of reconcile().
+                searchCallback.cancel(currentTuple);
             }
         }
         return false;
@@ -150,11 +164,6 @@
     public void next() throws HyracksDataException {
         // Mark the tuple as consumed, so hasNext() can move on.
         tupleConsumed = true;
-        // We assume that the underlying cursors materialize their results 
such that
-        // there is no need to reposition the result cursor after 
reconciliation.
-        if (!searchCallback.proceed(currentCursor.getTuple())) {
-            searchCallback.reconcile(currentCursor.getTuple());
-        }
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index 98ab803..21c099b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.ICursorInitialState;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
@@ -56,6 +57,7 @@
     private SearchPredicate rtreeSearchPredicate;
     private int numMutableComponents;
     private boolean open;
+    protected ISearchOperationCallback searchCallback;
 
     public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext 
opCtx) {
         this(opCtx, false);
@@ -75,6 +77,7 @@
         comparatorFields = lsmInitialState.getComparatorFields();
         operationalComponents = lsmInitialState.getOperationalComponents();
         rtreeSearchPredicate = (SearchPredicate) searchPred;
+        searchCallback = lsmInitialState.getSearchOperationCallback();
 
         includeMutableComponent = false;
         numMutableComponents = 0;
@@ -147,7 +150,13 @@
                 while (mutableRTreeCursors[currentCursor].hasNext()) {
                     mutableRTreeCursors[currentCursor].next();
                     ITupleReference currentTuple = 
mutableRTreeCursors[currentCursor].getTuple();
+                    // TODO: at this time, we only add proceed() part.
+                    // reconcile() and complete() can be added later after 
considering the semantics.
+
+                    // Call proceed() to do necessary operations before 
returning this tuple.
+                    searchCallback.proceed(currentTuple);
                     if (searchMemBTrees(currentTuple, currentCursor)) {
+                        // anti-matter tuple is NOT found
                         foundNext = true;
                         frameTuple = currentTuple;
                         return true;
@@ -160,14 +169,33 @@
             while (super.hasNext()) {
                 super.next();
                 ITupleReference diskRTreeTuple = super.getTuple();
+                // TODO: at this time, we only add proceed().
+                // reconcile() and complete() can be added later after 
considering the semantics.
+
+                // Call proceed() to do necessary operations before returning 
this tuple.
+                searchCallback.proceed(diskRTreeTuple);
                 if (searchMemBTrees(diskRTreeTuple, numMutableComponents)) {
+                    // anti-matter tuple is NOT found
                     foundNext = true;
                     frameTuple = diskRTreeTuple;
                     return true;
                 }
             }
         } else {
-            return super.hasNext();
+            if (super.hasNext()) {
+                super.next();
+                ITupleReference diskRTreeTuple = super.getTuple();
+
+                // TODO: at this time, we only add proceed() part.
+                // reconcile() and complete() can be added later after 
considering the semantics.
+
+                // Call proceed() to do necessary operations before returning 
this tuple.
+                // Since in-memory components don't exist, we can skip 
searching in-memory B-Trees.
+                searchCallback.proceed(diskRTreeTuple);
+                foundNext = true;
+                frameTuple = diskRTreeTuple;
+                return true;
+            }
         }
 
         return false;
@@ -175,22 +203,12 @@
 
     @Override
     public void next() throws HyracksDataException {
-        if (includeMutableComponent) {
-            foundNext = false;
-        } else {
-            super.next();
-        }
-
+        foundNext = false;
     }
 
     @Override
     public ITupleReference getTuple() {
-        if (includeMutableComponent) {
-            return frameTuple;
-        } else {
-            return super.getTuple();
-        }
-
+        return frameTuple;
     }
 
     @Override

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1630
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I299b1858b7875ffc116f8f3115d319fe7b53a537
Gerrit-PatchSet: 11
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Taewoo Kim <wangs...@gmail.com>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Taewoo Kim <wangs...@gmail.com>
Gerrit-Reviewer: Yingyi Bu <buyin...@gmail.com>

Reply via email to