Till Westmann has submitted this change and it was merged. Change subject: [ASTERIXDB-2205][STO] Maintain includeMutableComponent correctly ......................................................................
[ASTERIXDB-2205][STO] Maintain includeMutableComponent correctly - user model changes: no - storage format changes: no - interface changes: no Details: - This change fixes ASTERIXDB-2205. The root cause for ASTERIXDB-2205 was that the value of includeMutableComponent is not maintained correctly. Change-Id: Ic08a9372c608d6de960e1419899530aa55aa72e0 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2243 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java 2 files changed, 57 insertions(+), 63 deletions(-) Approvals: Anon. E. Moose #1000171: Till Westmann: Looks good to me, approved Jenkins: Verified; ; Verified Objections: Jenkins: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java index 3e14fb9..876bc6d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java @@ -105,53 +105,46 @@ while (!outputPriorityQueue.isEmpty() || needPushElementIntoQueue) { if (!outputPriorityQueue.isEmpty()) { PriorityQueueElement queueHead = outputPriorityQueue.peek(); - if (canCallProceed) { - // if there are no memory components. no need to lock at all - // since whatever the search reads will never changes - if (includeMutableComponent) { - if (!searchCallback.proceed(queueHead.getTuple())) { - // In case proceed() fails and there is an in-memory component, - // we can't simply use this element since there might be a change. - PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0); - if (mutableElement != null) { - // Copies the current queue head - if (tupleBuilder == null) { - tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount()); - } - TupleUtils.copyTuple(tupleBuilder, queueHead.getTuple(), cmp.getKeyFieldCount()); - copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); - // Unlatches/unpins the leaf page of the index. - rangeCursors[0].reset(); - // Reconcile. - searchCallback.reconcile(copyTuple); - // Re-traverses the index. - reusablePred.setLowKey(copyTuple, true); - btreeAccessors[0].search(rangeCursors[0], reusablePred); - //------ - includeMutableComponent = pushIntoQueueFromCursorAndReplaceThisElement(mutableElement); - // now that we have completed the search and we have latches over the pages, - // it is safe to complete the operation.. but as per the API of the callback - // we only complete if we're producing this tuple - // get head again - queueHead = outputPriorityQueue.peek(); - /* - * We need to restart in one of two cases: - * 1. no more elements in the priority queue. - * 2. the key of the head has changed (which means we need to call proceed) - */ - if (queueHead == null || cmp.compare(copyTuple, queueHead.getTuple()) != 0) { - // cancel since we're not continuing - searchCallback.cancel(copyTuple); - continue; - } - searchCallback.complete(copyTuple); - // it is safe to proceed now - } else { - // There are no more elements in the memory component.. can safely skip locking for the - // remaining operations - includeMutableComponent = false; - } + if (canCallProceed && includeMutableComponent && !searchCallback.proceed(queueHead.getTuple())) { + // In case proceed() fails and there is an in-memory component, + // we can't simply use this element since there might be a change. + PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0); + if (mutableElement != null) { + // Copies the current queue head + if (tupleBuilder == null) { + tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount()); } + TupleUtils.copyTuple(tupleBuilder, queueHead.getTuple(), cmp.getKeyFieldCount()); + copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); + // Unlatches/unpins the leaf page of the index. + rangeCursors[0].reset(); + // Reconcile. + searchCallback.reconcile(copyTuple); + // Re-traverses the index. + reusablePred.setLowKey(copyTuple, true); + btreeAccessors[0].search(rangeCursors[0], reusablePred); + pushIntoQueueFromCursorAndReplaceThisElement(mutableElement); + // now that we have completed the search and we have latches over the pages, + // it is safe to complete the operation.. but as per the API of the callback + // we only complete if we're producing this tuple + // get head again + queueHead = outputPriorityQueue.peek(); + /* + * We need to restart in one of two cases: + * 1. no more elements in the priority queue. + * 2. the key of the head has changed (which means we need to call proceed) + */ + if (queueHead == null || cmp.compare(copyTuple, queueHead.getTuple()) != 0) { + // cancel since we're not continuing + searchCallback.cancel(copyTuple); + continue; + } + searchCallback.complete(copyTuple); + // it is safe to proceed now + } else { + // There are no more elements in the memory component.. can safely skip locking for the + // remaining operations + includeMutableComponent = false; } } @@ -217,19 +210,21 @@ opCtx.getIndex().getHarness().replaceMemoryComponentsWithDiskComponents(getOpCtx(), replaceFrom); // redo the search on the new component for (int i = replaceFrom; i < switchRequest.length; i++) { - if (switchRequest[i] && switchedElements[i] != null) { - copyTuple.reset(switchComponentTupleBuilders[i].getFieldEndOffsets(), - switchComponentTupleBuilders[i].getByteArray()); - reusablePred.setLowKey(copyTuple, true); - rangeCursors[i].reset(); + if (switchRequest[i]) { ILSMComponent component = operationalComponents.get(i); BTree btree = (BTree) component.getIndex(); if (i == 0 && component.getType() != LSMComponentType.MEMORY) { includeMutableComponent = false; } - btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - btreeAccessors[i].search(rangeCursors[i], reusablePred); - pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]); + if (switchedElements[i] != null) { + copyTuple.reset(switchComponentTupleBuilders[i].getFieldEndOffsets(), + switchComponentTupleBuilders[i].getByteArray()); + reusablePred.setLowKey(copyTuple, true); + rangeCursors[i].reset(); + btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + btreeAccessors[i].search(rangeCursors[i], reusablePred); + pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]); + } } switchRequest[i] = false; // any failed switch makes further switches pointless @@ -305,7 +300,7 @@ // Re-traverses the index. reusablePred.setLowKey(copyTuple, true); btreeAccessors[0].search(rangeCursors[0], reusablePred); - includeMutableComponent = pushIntoQueueFromCursorAndReplaceThisElement(mutableElement); + pushIntoQueueFromCursorAndReplaceThisElement(mutableElement); } } tupleFromMemoryComponentCount = 0; @@ -354,13 +349,11 @@ // re-use rangeCursors[i].reset(); } + if (component.getType() == LSMComponentType.MEMORY) { includeMutableComponent = true; - btree = (BTree) component.getIndex(); - } else { - btree = (BTree) component.getIndex(); } - + btree = (BTree) component.getIndex(); if (btreeAccessors[i] == null) { btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE); } else { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java index 17c681c..e37669e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java @@ -194,16 +194,18 @@ return filter == null ? null : filter.getMaxTuple(); } - protected boolean pushIntoQueueFromCursorAndReplaceThisElement(PriorityQueueElement e) throws HyracksDataException { + protected void pushIntoQueueFromCursorAndReplaceThisElement(PriorityQueueElement e) throws HyracksDataException { int cursorIndex = e.getCursorIndex(); if (rangeCursors[cursorIndex].hasNext()) { rangeCursors[cursorIndex].next(); e.reset(rangeCursors[cursorIndex].getTuple()); outputPriorityQueue.offer(e); - return true; + return; } rangeCursors[cursorIndex].close(); - return false; + if (cursorIndex == 0) { + includeMutableComponent = false; + } } protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException { @@ -324,5 +326,4 @@ throws HyracksDataException { return cmp.compare(tupleA, tupleB); } - } -- To view, visit https://asterix-gerrit.ics.uci.edu/2243 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ic08a9372c608d6de960e1419899530aa55aa72e0 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
