http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java deleted file mode 100644 index e9b3f21..0000000 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.storage.am.lsm.invertedindex.api; - -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.common.MultiComparator; - -public interface IInvertedListCursor extends Comparable<IInvertedListCursor> { - public void reset(int startPageId, int endPageId, int startOff, int numElements); - - public void pinPages() throws HyracksDataException; - - public void unpinPages() throws HyracksDataException; - - public boolean hasNext() throws HyracksDataException; - - public void next() throws HyracksDataException; - - public ITupleReference getTuple(); - - // getters - public int size() throws HyracksDataException; - - public int getStartPageId(); - - public int getEndPageId(); - - public int getStartOff(); - - public boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp) throws HyracksDataException; - - // for debugging - @SuppressWarnings("rawtypes") - public String printInvList(ISerializerDeserializer[] serdes) throws HyracksDataException; - - @SuppressWarnings("rawtypes") - public String printCurrentElement(ISerializerDeserializer[] serdes) throws HyracksDataException; -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java index 93b182d..fe735bc 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java @@ -19,6 +19,8 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.api; +import org.apache.hyracks.api.exceptions.HyracksDataException; + public interface IObjectFactory<T> { - public T create(); + public T create() throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java index df8e6f0..fd80c00 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java @@ -19,16 +19,14 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.api; -import java.util.List; - import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions; public interface IPartitionedInvertedIndex { public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx, - short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions, - List<IInvertedListCursor> cursorsOrderedByTokens) throws HyracksDataException; + short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions) + throws HyracksDataException; public boolean isEmpty(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/InvertedListCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/InvertedListCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/InvertedListCursor.java new file mode 100644 index 0000000..9db7dc8 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/api/InvertedListCursor.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.invertedindex.api; + +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexSearchCursorInitialState; +import org.apache.hyracks.storage.common.EnforcedIndexCursor; +import org.apache.hyracks.storage.common.ICursorInitialState; +import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.storage.common.MultiComparator; + +/** + * A cursor that reads an inverted list. + */ +public abstract class InvertedListCursor extends EnforcedIndexCursor implements Comparable<InvertedListCursor> { + + /** + * Opens an inverted list cursor. + */ + protected void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException { + // If the given cursor state has page ids and the number of elements for the given inverted list, + // this should be set. Otherwise (for in-memory cursor), doesn't need to do anything. + int invListStartPageId = LSMInvertedIndexSearchCursorInitialState.INVALID_VALUE; + int invListEndPageId = LSMInvertedIndexSearchCursorInitialState.INVALID_VALUE; + int invListStartOffset = LSMInvertedIndexSearchCursorInitialState.INVALID_VALUE; + int invListNumElements = LSMInvertedIndexSearchCursorInitialState.INVALID_VALUE; + if (initialState != null && initialState instanceof LSMInvertedIndexSearchCursorInitialState) { + LSMInvertedIndexSearchCursorInitialState invIndexInitialState = + (LSMInvertedIndexSearchCursorInitialState) initialState; + invListStartPageId = invIndexInitialState.getInvListStartPageId(); + invListEndPageId = invIndexInitialState.getInvListEndPageId(); + invListStartOffset = invIndexInitialState.getInvListStartOffset(); + invListNumElements = invIndexInitialState.getInvListNumElements(); + } + if (invListNumElements != LSMInvertedIndexSearchCursorInitialState.INVALID_VALUE) { + setInvListInfo(invListStartPageId, invListEndPageId, invListStartOffset, invListNumElements); + } + } + + /** + * Sets the disk-based inverted list information such as page ids and the number of elements + * for the given inverted list. + */ + protected abstract void setInvListInfo(int startPageId, int endPageId, int startOff, int numElements) + throws HyracksDataException; + + /** + * Conducts any operation that is required before loading pages. + */ + public abstract void prepareLoadPages() throws HyracksDataException; + + /** + * Loads one or more pages to memory. + */ + public abstract void loadPages() throws HyracksDataException; + + /** + * Unloads currently loaded pages in the memory. + */ + public abstract void unloadPages() throws HyracksDataException; + + /** + * Gets the cardinality of elements in the cursor. + */ + public abstract int size() throws HyracksDataException; + + /** + * Checks whether the given tuple is contained in the cursor. + */ + public abstract boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp) + throws HyracksDataException; + + /** + * Prints all elements in the cursor (debug method). + */ + @SuppressWarnings("rawtypes") + public abstract String printInvList(ISerializerDeserializer[] serdes) throws HyracksDataException; + + /** + * Prints the current element in the cursor (debug method). + */ + @SuppressWarnings("rawtypes") + public abstract String printCurrentElement(ISerializerDeserializer[] serdes) throws HyracksDataException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java index 35c0dec..13a649d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java @@ -49,13 +49,16 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends AbstractSingleActi private final IMissingWriterFactory missingWriterFactory; private final ISearchOperationCallbackFactory searchCallbackFactory; private final int numOfFields; + // the maximum number of frames that this inverted-index-search can use + private final int frameLimit; public LSMInvertedIndexSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int queryField, IIndexDataflowHelperFactory indexHelperFactory, IBinaryTokenizerFactory queryTokenizerFactory, IInvertedIndexSearchModifierFactory searchModifierFactory, boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, int[] minFilterFieldIndexes, - int[] maxFilterFieldIndexes, boolean isFullTextSearchQuery, int numOfFields, boolean appendIndexFilter) { + int[] maxFilterFieldIndexes, boolean isFullTextSearchQuery, int numOfFields, boolean appendIndexFilter, + int frameLimit) { super(spec, 1, 1); this.indexHelperFactory = indexHelperFactory; this.queryTokenizerFactory = queryTokenizerFactory; @@ -71,6 +74,7 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends AbstractSingleActi this.appendIndexFilter = appendIndexFilter; this.numOfFields = numOfFields; this.outRecDescs[0] = outRecDesc; + this.frameLimit = frameLimit; } @Override @@ -81,6 +85,6 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends AbstractSingleActi recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, searchModifier, queryTokenizerFactory, queryField, isFullTextSearchQuery, - numOfFields, appendIndexFilter); + numOfFields, appendIndexFilter, frameLimit); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java index ed7a4a6..a27dea7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java @@ -23,13 +23,20 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.common.utils.TaskUtil; +import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool; +import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager; +import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool; +import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier; import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate; import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; +import org.apache.hyracks.storage.common.IIndexAccessParameters; import org.apache.hyracks.storage.common.ISearchPredicate; public class LSMInvertedIndexSearchOperatorNodePushable extends IndexSearchOperatorNodePushable { @@ -41,14 +48,17 @@ public class LSMInvertedIndexSearchOperatorNodePushable extends IndexSearchOpera // Keeps the information whether the given query is a full-text search or not. // We need to have this information to stop the search process since we don't allow a phrase search yet. protected final boolean isFullTextSearchQuery; + // Budget-constrained buffer manager for conducting the search operation + protected final ISimpleFrameBufferManager bufferManagerForSearch; + protected final IDeallocatableFramePool framePool; public LSMInvertedIndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory, IInvertedIndexSearchModifier searchModifier, IBinaryTokenizerFactory binaryTokenizerFactory, - int queryFieldIndex, boolean isFullTextSearchQuery, int numOfFields, boolean appendIndexFilter) - throws HyracksDataException { + int queryFieldIndex, boolean isFullTextSearchQuery, int numOfFields, boolean appendIndexFilter, + int frameLimit) throws HyracksDataException { super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter); this.searchModifier = searchModifier; @@ -60,6 +70,11 @@ public class LSMInvertedIndexSearchOperatorNodePushable extends IndexSearchOpera this.frameTuple = new FrameTupleReference(); } this.numOfFields = numOfFields; + // Intermediate and final search result will use this buffer manager to get frames. + this.framePool = new DeallocatableFramePool(ctx, frameLimit * ctx.getInitialFrameSize()); + this.bufferManagerForSearch = new FramePoolBackedFrameBufferManager(framePool); + // Keep the buffer manager in the hyracks context so that the search process can get it via the context. + TaskUtil.put(HyracksConstants.INVERTED_INDEX_SEARCH_FRAME_MANAGER, bufferManagerForSearch, ctx); } @Override @@ -87,4 +102,9 @@ public class LSMInvertedIndexSearchOperatorNodePushable extends IndexSearchOpera protected int getFieldCount() { return numOfFields; } + + @Override + protected void addAdditionalIndexAccessorParams(IIndexAccessParameters iap) throws HyracksDataException { + iap.getParameters().put(HyracksConstants.HYRACKS_TASK_CONTEXT, ctx); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index 1beff71..a395e67 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -69,8 +69,6 @@ import org.apache.hyracks.storage.common.ICursorInitialState; import org.apache.hyracks.storage.common.IIndexAccessParameters; import org.apache.hyracks.storage.common.IIndexAccessor; import org.apache.hyracks.storage.common.IIndexCursor; -import org.apache.hyracks.storage.common.IModificationOperationCallback; -import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.MultiComparator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; @@ -193,6 +191,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex @Override public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException { + LSMInvertedIndexOpContext ctx = (LSMInvertedIndexOpContext) ictx; List<ILSMComponent> operationalComponents = ictx.getComponentHolder(); int numComponents = operationalComponents.size(); boolean includeMutableComponent = false; @@ -203,15 +202,13 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex ILSMComponent component = operationalComponents.get(i); if (component.getType() == LSMComponentType.MEMORY) { includeMutableComponent = true; - IIndexAccessor invIndexAccessor = - component.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); + IIndexAccessor invIndexAccessor = component.getIndex().createAccessor(ctx.getIndexAccessParameters()); indexAccessors.add(invIndexAccessor); IIndexAccessor deletedKeysAccessor = ((LSMInvertedIndexMemoryComponent) component).getBuddyIndex() .createAccessor(NoOpIndexAccessParameters.INSTANCE); deletedKeysBTreeAccessors.add(deletedKeysAccessor); } else { - IIndexAccessor invIndexAccessor = - component.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); + IIndexAccessor invIndexAccessor = component.getIndex().createAccessor(ctx.getIndexAccessParameters()); indexAccessors.add(invIndexAccessor); IIndexAccessor deletedKeysAccessor = ((LSMInvertedIndexDiskComponent) component).getBuddyIndex() .createAccessor(NoOpIndexAccessParameters.INSTANCE); @@ -436,15 +433,13 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex @Override public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException { - return new LSMInvertedIndexAccessor(getHarness(), - createOpContext(iap.getModificationCallback(), iap.getSearchOperationCallback())); + return new LSMInvertedIndexAccessor(getHarness(), createOpContext(iap)); } @Override - protected LSMInvertedIndexOpContext createOpContext(IModificationOperationCallback modificationCallback, - ISearchOperationCallback searchCallback) throws HyracksDataException { - return new LSMInvertedIndexOpContext(this, memoryComponents, modificationCallback, searchCallback, - invertedIndexFieldsForNonBulkLoadOps, filterFieldsForNonBulkLoadOps, getFilterCmpFactories(), tracer); + protected LSMInvertedIndexOpContext createOpContext(IIndexAccessParameters iap) throws HyracksDataException { + return new LSMInvertedIndexOpContext(this, memoryComponents, iap, invertedIndexFieldsForNonBulkLoadOps, + filterFieldsForNonBulkLoadOps, getFilterCmpFactories(), tracer); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index c33e2ce..d7408ff 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -35,7 +35,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchPredicate; @@ -178,12 +178,12 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd } @Override - public IInvertedListCursor createInvertedListCursor() { + public InvertedListCursor createInvertedListCursor() { throw new UnsupportedOperationException("Cannot create inverted list cursor on lsm inverted index."); } @Override - public void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference searchKey) + public void openInvertedListCursor(InvertedListCursor listCursor, ITupleReference searchKey) throws HyracksDataException { throw new UnsupportedOperationException("Cannot open inverted list cursor on lsm inverted index."); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java index 1fe4bd2..e7a725e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java @@ -30,9 +30,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor; +import org.apache.hyracks.storage.common.IIndexAccessParameters; import org.apache.hyracks.storage.common.IIndexAccessor; -import org.apache.hyracks.storage.common.IModificationOperationCallback; -import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.util.trace.ITracer; public class LSMInvertedIndexOpContext extends AbstractLSMIndexOperationContext { @@ -47,21 +46,22 @@ public class LSMInvertedIndexOpContext extends AbstractLSMIndexOperationContext private IIndexAccessor[] deletedKeysBTreeAccessors; private IInvertedIndexAccessor currentMutableInvIndexAccessors; private IIndexAccessor currentDeletedKeysBTreeAccessors; + // To keep the buffer frame manager in case of a search + private IIndexAccessParameters iap; private boolean destroyed = false; public LSMInvertedIndexOpContext(ILSMIndex index, List<ILSMMemoryComponent> mutableComponents, - IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback, - int[] invertedIndexFields, int[] filterFields, IBinaryComparatorFactory[] filterComparatorFactories, - ITracer tracer) throws HyracksDataException { - super(index, invertedIndexFields, filterFields, filterComparatorFactories, searchCallback, modificationCallback, - tracer); + IIndexAccessParameters iap, int[] invertedIndexFields, int[] filterFields, + IBinaryComparatorFactory[] filterComparatorFactories, ITracer tracer) throws HyracksDataException { + super(index, invertedIndexFields, filterFields, filterComparatorFactories, iap.getSearchOperationCallback(), + iap.getModificationCallback(), tracer); mutableInvIndexAccessors = new IInvertedIndexAccessor[mutableComponents.size()]; deletedKeysBTreeAccessors = new IIndexAccessor[mutableComponents.size()]; for (int i = 0; i < mutableComponents.size(); i++) { LSMInvertedIndexMemoryComponent mutableComponent = (LSMInvertedIndexMemoryComponent) mutableComponents.get(i); if (allFields != null) { - mutableInvIndexAccessors[i] = mutableComponent.getIndex().createAccessor(allFields); + mutableInvIndexAccessors[i] = mutableComponent.getIndex().createAccessor(iap, allFields); } else { mutableInvIndexAccessors[i] = mutableComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); @@ -77,6 +77,7 @@ public class LSMInvertedIndexOpContext extends AbstractLSMIndexOperationContext keyFieldPermutation[i] = NUM_DOCUMENT_FIELDS + i; } keysOnlyTuple = new PermutingTupleReference(keyFieldPermutation); + this.iap = iap; } @Override @@ -97,6 +98,10 @@ public class LSMInvertedIndexOpContext extends AbstractLSMIndexOperationContext return currentDeletedKeysBTreeAccessors; } + public IIndexAccessParameters getIndexAccessParameters() { + return iap; + } + @Override public void destroy() throws HyracksDataException { if (destroyed) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java index 93ede6d..6e35e07 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java @@ -35,6 +35,8 @@ import org.apache.hyracks.storage.common.buffercache.ICachedPage; public class LSMInvertedIndexSearchCursorInitialState implements ICursorInitialState { + public static final int INVALID_VALUE = -1; + private final boolean includeMemComponent; private final ILSMHarness lsmHarness; private final List<IIndexAccessor> indexAccessors; @@ -48,6 +50,17 @@ public class LSMInvertedIndexSearchCursorInitialState implements ICursorInitialS private final List<ILSMComponent> operationalComponents; + // For disk-based inverted list cursors + private int invListStartPageId = INVALID_VALUE; + private int invListEndPageId = INVALID_VALUE; + private int invListStartOffset = INVALID_VALUE; + private int invListNumElements = INVALID_VALUE; + + public LSMInvertedIndexSearchCursorInitialState() { + this(null, null, null, null, null, null, false, null, null); + resetInvertedListInfo(); + } + public LSMInvertedIndexSearchCursorInitialState(final MultiComparator keyCmp, PermutingTupleReference keysOnlyTuple, List<IIndexAccessor> indexAccessors, List<IIndexAccessor> deletedKeysBTreeAccessors, ITreeIndexFrameFactory deletedKeysBtreeLeafFrameFactory, IIndexOperationContext ctx, @@ -61,7 +74,8 @@ public class LSMInvertedIndexSearchCursorInitialState implements ICursorInitialS this.operationalComponents = operationalComponents; this.lsmHarness = lsmHarness; this.ctx = (LSMInvertedIndexOpContext) ctx; - this.searchCallback = this.ctx.getSearchOperationCallback(); + this.searchCallback = ctx != null ? this.ctx.getSearchOperationCallback() : null; + resetInvertedListInfo(); } @Override @@ -128,4 +142,35 @@ public class LSMInvertedIndexSearchCursorInitialState implements ICursorInitialS public PermutingTupleReference getKeysOnlyTuple() { return keysOnlyTuple; } + + public void setInvertedListInfo(int invListStartPageId, int invListEndPageId, int invListStartOffset, + int invListNumElements) { + this.invListStartPageId = invListStartPageId; + this.invListEndPageId = invListEndPageId; + this.invListStartOffset = invListStartOffset; + this.invListNumElements = invListNumElements; + } + + public int getInvListStartPageId() { + return invListStartPageId; + } + + public int getInvListEndPageId() { + return invListEndPageId; + } + + public int getInvListStartOffset() { + return invListStartOffset; + } + + public int getInvListNumElements() { + return invListNumElements; + } + + private void resetInvertedListInfo() { + invListStartPageId = INVALID_VALUE; + invListEndPageId = INVALID_VALUE; + invListStartOffset = INVALID_VALUE; + invListNumElements = INVALID_VALUE; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java index 641ca3c..e74733b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java @@ -18,11 +18,13 @@ */ package org.apache.hyracks.storage.am.lsm.invertedindex.inmemory; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.btree.frames.BTreeLeafFrameType; import org.apache.hyracks.storage.am.btree.impls.BTree; @@ -32,7 +34,7 @@ import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.api.IPageManager; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; import org.apache.hyracks.storage.common.IIndexAccessParameters; import org.apache.hyracks.storage.common.IIndexBulkLoader; @@ -150,29 +152,41 @@ public class InMemoryInvertedIndex implements IInPlaceInvertedIndex { } @Override - public IInvertedListCursor createInvertedListCursor() { + public InvertedListCursor createInvertedListCursor(IHyracksTaskContext ctx) { return new InMemoryInvertedListCursor(invListTypeTraits.length, tokenTypeTraits.length); } @Override - public void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference searchKey, + public InvertedListCursor createInvertedListRangeSearchCursor() { + // An in-memory index does not have a separate inverted list. + // Therefore, a different range-search cursor for an inverted list is not required. + return createInvertedListCursor(null); + } + + @Override + public void openInvertedListCursor(InvertedListCursor listCursor, ITupleReference searchKey, IIndexOperationContext ictx) throws HyracksDataException { InMemoryInvertedIndexOpContext ctx = (InMemoryInvertedIndexOpContext) ictx; ctx.setOperation(IndexOperation.SEARCH); InMemoryInvertedListCursor inMemListCursor = (InMemoryInvertedListCursor) listCursor; inMemListCursor.prepare(ctx.getBtreeAccessor(), ctx.getBtreePred(), ctx.getTokenFieldsCmp(), ctx.getBtreeCmp()); inMemListCursor.reset(searchKey); + // Makes the cursor state to OPENED + inMemListCursor.open(null, null); } @Override public InMemoryInvertedIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException { return new InMemoryInvertedIndexAccessor(this, - new InMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory)); + new InMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory), + (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT)); } - public InMemoryInvertedIndexAccessor createAccessor(int[] nonIndexFields) throws HyracksDataException { + public InMemoryInvertedIndexAccessor createAccessor(IIndexAccessParameters iap, int[] nonIndexFields) + throws HyracksDataException { return new InMemoryInvertedIndexAccessor(this, - new InMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory), nonIndexFields); + new InMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory), nonIndexFields, + (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT)); } @Override @@ -219,4 +233,5 @@ public class InMemoryInvertedIndex implements IInPlaceInvertedIndex { public void purge() throws HyracksDataException { btree.purge(); } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java index 0795a4e..2a35301 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.inmemory; -import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame; @@ -31,8 +31,7 @@ import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; -import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex.DefaultHyracksCommonContext; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor; import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate; import org.apache.hyracks.storage.am.lsm.invertedindex.search.TOccurrenceSearcher; @@ -40,27 +39,29 @@ import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchPredicate; public class InMemoryInvertedIndexAccessor implements IInvertedIndexAccessor { - // TODO: This ctx needs to go away. - protected final IHyracksCommonContext hyracksCtx = new DefaultHyracksCommonContext(); - protected final IInvertedIndexSearcher searcher; + protected final IHyracksTaskContext ctx; + protected IInvertedIndexSearcher searcher; protected IIndexOperationContext opCtx; protected InMemoryInvertedIndex index; protected BTreeAccessor btreeAccessor; private boolean destroyed = false; - public InMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx) - throws HyracksDataException { + public InMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx, + IHyracksTaskContext ctx) throws HyracksDataException { + this.ctx = ctx; this.opCtx = opCtx; this.index = index; - this.searcher = createSearcher(); + // Searcher will be initialized when conducting an actual search. + this.searcher = null; this.btreeAccessor = index.getBTree().createAccessor(NoOpIndexAccessParameters.INSTANCE); } public InMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx, - int[] nonIndexFields) throws HyracksDataException { + int[] nonIndexFields, IHyracksTaskContext ctx) throws HyracksDataException { + this.ctx = ctx; this.opCtx = opCtx; this.index = index; - this.searcher = createSearcher(); + this.searcher = null; this.btreeAccessor = index.getBTree().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE, nonIndexFields); } @@ -78,22 +79,28 @@ public class InMemoryInvertedIndexAccessor implements IInvertedIndexAccessor { } @Override - public IIndexCursor createSearchCursor(boolean exclusive) { - return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length); + public IIndexCursor createSearchCursor(boolean exclusive) throws HyracksDataException { + if (searcher == null) { + searcher = createSearcher(); + } + return new OnDiskInvertedIndexSearchCursor(searcher); } @Override public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException { - searcher.search((OnDiskInvertedIndexSearchCursor) cursor, (InvertedIndexSearchPredicate) searchPred, opCtx); + if (searcher == null) { + searcher = createSearcher(); + } + searcher.search(cursor, (InvertedIndexSearchPredicate) searchPred, opCtx); } @Override - public IInvertedListCursor createInvertedListCursor() { - return index.createInvertedListCursor(); + public InvertedListCursor createInvertedListCursor() { + return index.createInvertedListCursor(ctx); } @Override - public void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference searchKey) + public void openInvertedListCursor(InvertedListCursor listCursor, ITupleReference searchKey) throws HyracksDataException { index.openInvertedListCursor(listCursor, searchKey, opCtx); } @@ -124,7 +131,10 @@ public class InMemoryInvertedIndexAccessor implements IInvertedIndexAccessor { } protected IInvertedIndexSearcher createSearcher() throws HyracksDataException { - return new TOccurrenceSearcher(hyracksCtx, index); + if (ctx != null) { + return new TOccurrenceSearcher(index, ctx); + } + return null; } public void resetLogTuple(ITupleReference newTuple) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java index b2660a4..085f8d5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java @@ -33,11 +33,11 @@ import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor; import org.apache.hyracks.storage.am.btree.impls.RangePredicate; import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.MultiComparator; -public class InMemoryInvertedListCursor implements IInvertedListCursor { +public class InMemoryInvertedListCursor extends InvertedListCursor { private RangePredicate btreePred; private BTreeAccessor btreeAccessor; private IIndexCursor btreeCursor; @@ -80,7 +80,7 @@ public class InMemoryInvertedListCursor implements IInvertedListCursor { } @Override - public int compareTo(IInvertedListCursor cursor) { + public int compareTo(InvertedListCursor cursor) { try { return size() - cursor.size(); } catch (HyracksDataException hde) { @@ -100,12 +100,13 @@ public class InMemoryInvertedListCursor implements IInvertedListCursor { } @Override - public void reset(int startPageId, int endPageId, int startOff, int numElements) { - // Do nothing + protected void setInvListInfo(int startPageId, int endPageId, int startOff, int numElements) + throws HyracksDataException { + // no-op for this in-memory cursor - everything is in memory } @Override - public void pinPages() throws HyracksDataException { + public void loadPages() throws HyracksDataException { btreePred.setLowKeyComparator(tokenFieldsCmp); btreePred.setHighKeyComparator(tokenFieldsCmp); btreePred.setLowKey(tokenTuple, true); @@ -115,7 +116,7 @@ public class InMemoryInvertedListCursor implements IInvertedListCursor { } @Override - public void unpinPages() throws HyracksDataException { + public void unloadPages() throws HyracksDataException { if (cursorNeedsClose) { btreeCursor.close(); cursorNeedsClose = false; @@ -123,17 +124,17 @@ public class InMemoryInvertedListCursor implements IInvertedListCursor { } @Override - public boolean hasNext() throws HyracksDataException { + public boolean doHasNext() throws HyracksDataException { return btreeCursor.hasNext(); } @Override - public void next() throws HyracksDataException { + public void doNext() throws HyracksDataException { btreeCursor.next(); } @Override - public ITupleReference getTuple() { + public ITupleReference doGetTuple() { resultTuple.reset(btreeCursor.getTuple()); return resultTuple; } @@ -161,24 +162,9 @@ public class InMemoryInvertedListCursor implements IInvertedListCursor { } @Override - public int getStartPageId() { - return 0; - } - - @Override - public int getEndPageId() { - return 0; - } - - @Override - public int getStartOff() { - return 0; - } - - @Override public boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp) throws HyracksDataException { // Close cursor if necessary. - unpinPages(); + unloadPages(); btreeSearchTuple.addTuple(searchTuple); btreePred.setLowKeyComparator(btreeCmp); btreePred.setHighKeyComparator(btreeCmp); @@ -226,4 +212,20 @@ public class InMemoryInvertedListCursor implements IInvertedListCursor { public String printCurrentElement(ISerializerDeserializer[] serdes) throws HyracksDataException { return null; } + + @Override + public void prepareLoadPages() throws HyracksDataException { + // no-op for this in-memory cursor - no need to initialize a buffer + } + + @Override + public void doClose() throws HyracksDataException { + btreeCursor.close(); + } + + @Override + public void doDestroy() throws HyracksDataException { + btreeCursor.destroy(); + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java index 5ddba98..986ceac 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java @@ -18,7 +18,6 @@ */ package org.apache.hyracks.storage.am.lsm.invertedindex.inmemory; -import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; @@ -31,7 +30,6 @@ import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.api.IPageManager; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex; import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions; import org.apache.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher; @@ -89,20 +87,21 @@ public class PartitionedInMemoryInvertedIndex extends InMemoryInvertedIndex impl public PartitionedInMemoryInvertedIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException { return new PartitionedInMemoryInvertedIndexAccessor(this, - new PartitionedInMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory)); + new PartitionedInMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory), iap); } @Override - public PartitionedInMemoryInvertedIndexAccessor createAccessor(int[] nonIndexFields) throws HyracksDataException { + public PartitionedInMemoryInvertedIndexAccessor createAccessor(IIndexAccessParameters iap, int[] nonIndexFields) + throws HyracksDataException { return new PartitionedInMemoryInvertedIndexAccessor(this, new PartitionedInMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory), - nonIndexFields); + nonIndexFields, iap); } @Override public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx, - short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions, - List<IInvertedListCursor> cursorsOrderedByTokens) throws HyracksDataException { + short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions) + throws HyracksDataException { short minPartitionIndex; short maxPartitionIndex; partitionIndexLock.readLock().lock(); @@ -140,6 +139,8 @@ public class PartitionedInMemoryInvertedIndex extends InMemoryInvertedIndex impl inMemListCursor.prepare(ctx.getBtreeAccessor(), ctx.getBtreePred(), ctx.getTokenFieldsCmp(), ctx.getBtreeCmp()); inMemListCursor.reset(searchKey); + // Makes the cursor state to OPENED + inMemListCursor.open(null, null); invListPartitions.addInvertedListCursor(inMemListCursor, i); } return true; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java index a4537a9..b5044d0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java @@ -19,24 +19,28 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.inmemory; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher; import org.apache.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher; +import org.apache.hyracks.storage.common.IIndexAccessParameters; public class PartitionedInMemoryInvertedIndexAccessor extends InMemoryInvertedIndexAccessor { - public PartitionedInMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx) - throws HyracksDataException { - super(index, opCtx); + public PartitionedInMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx, + IIndexAccessParameters iap) throws HyracksDataException { + super(index, opCtx, (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT)); } public PartitionedInMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx, - int[] nonIndexFields) throws HyracksDataException { - super(index, opCtx, nonIndexFields); + int[] nonIndexFields, IIndexAccessParameters iap) throws HyracksDataException { + super(index, opCtx, nonIndexFields, + (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT)); } protected IInvertedIndexSearcher createSearcher() throws HyracksDataException { - return new PartitionedTOccurrenceSearcher(hyracksCtx, index); + return new PartitionedTOccurrenceSearcher(index, ctx); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java index eec2993..da3f079 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java @@ -22,152 +22,351 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk; import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; +import org.apache.hyracks.dataflow.common.utils.TaskUtil; +import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; import org.apache.hyracks.storage.common.MultiComparator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.storage.common.file.BufferedFileHandle; -public class FixedSizeElementInvertedListCursor implements IInvertedListCursor { +/** + * A cursor class that traverse an inverted list that consists of fixed-size elements on disk + * + */ +public class FixedSizeElementInvertedListCursor extends InvertedListCursor { private final IBufferCache bufferCache; private final int fileId; private final int elementSize; - private int currentElementIx; - private int currentOff; - private int currentPageIx; - + // for sequential scan + private int currentElementIxForScan; + private int currentOffsetForScan; + private int currentPageIxForScan; + // the whole range of the given inverted list private int startPageId; private int endPageId; private int startOff; private int numElements; + private int numPages; + // the current range of the loaded pages in memory + private int bufferStartPageId; + private int bufferEndPageId; + private int bufferStartElementIx; + private int bufferEndElementIx; + private int bufferNumLoadedPages; private final FixedSizeTupleReference tuple; - private ICachedPage[] pages = new ICachedPage[10]; + // The last element in the current range in memory + private final FixedSizeTupleReference bufferEndElementTuple; + private ICachedPage page; + // The last element index per page private int[] elementIndexes = new int[10]; - - private boolean pinned = false; - - public FixedSizeElementInvertedListCursor(IBufferCache bufferCache, int fileId, ITypeTraits[] invListFields) { + // buffer manager to conform to the memory budget + private final ISimpleFrameBufferManager bufferManagerForSearch; + private ArrayList<ByteBuffer> buffers; + private boolean moreBlocksToRead = true; + // The last searched element index (used for random traversal) + private int lastRandomSearchedElementIx; + + public FixedSizeElementInvertedListCursor(IBufferCache bufferCache, int fileId, ITypeTraits[] invListFields, + IHyracksTaskContext ctx) throws HyracksDataException { this.bufferCache = bufferCache; this.fileId = fileId; - this.currentElementIx = 0; - this.currentPageIx = 0; - - int tmp = 0; + int tmpSize = 0; for (int i = 0; i < invListFields.length; i++) { - tmp += invListFields[i].getFixedLength(); + tmpSize += invListFields[i].getFixedLength(); } - elementSize = tmp; - this.currentOff = -elementSize; + elementSize = tmpSize; + this.currentOffsetForScan = -elementSize; + this.currentElementIxForScan = 0; + this.currentPageIxForScan = 0; + this.bufferStartPageId = 0; + this.bufferEndPageId = 0; + this.bufferStartElementIx = 0; + this.bufferEndElementIx = 0; + this.bufferNumLoadedPages = 0; + this.lastRandomSearchedElementIx = 0; + this.moreBlocksToRead = true; this.tuple = new FixedSizeTupleReference(invListFields); + this.bufferEndElementTuple = new FixedSizeTupleReference(invListFields); + this.buffers = new ArrayList<ByteBuffer>(); + if (ctx == null) { + throw HyracksDataException.create(ErrorCode.CANNOT_CONTINUE_TEXT_SEARCH_HYRACKS_TASK_IS_NULL); + } + this.bufferManagerForSearch = TaskUtil.get(HyracksConstants.INVERTED_INDEX_SEARCH_FRAME_MANAGER, ctx); + if (bufferManagerForSearch == null) { + throw HyracksDataException.create(ErrorCode.CANNOT_CONTINUE_TEXT_SEARCH_BUFFER_MANAGER_IS_NULL); + } + } + + /** + * Tries to allocate enough buffers to read the inverted list at once. If memory budget is not enough, this method + * stops allocating buffers. + */ + private void allocateBuffers() throws HyracksDataException { + do { + ByteBuffer tmpBuffer = bufferManagerForSearch.acquireFrame(bufferCache.getPageSize()); + if (tmpBuffer == null) { + // Budget exhausted + break; + } + Arrays.fill(tmpBuffer.array(), (byte) 0); + buffers.add(tmpBuffer); + } while (buffers.size() < numPages); + // At least there should be one frame to load a page from disk. + if (buffers.isEmpty()) { + throw HyracksDataException.create(ErrorCode.NOT_ENOUGH_BUDGET_FOR_TEXTSEARCH, + FixedSizeElementInvertedListCursor.class.getName()); + } + } + + /** + * Deallocates all buffers. i.e. releases all buffers to the buffer manager. + */ + private void deallocateBuffers() throws HyracksDataException { + for (int i = 0; i < buffers.size(); i++) { + bufferManagerForSearch.releaseFrame(buffers.get(i)); + buffers.set(i, null); + } + buffers.clear(); } + /** + * Clears the contents of the buffers. + */ + private void clearBuffers() throws HyracksDataException { + for (int i = 0; i < buffers.size(); i++) { + Arrays.fill(buffers.get(i).array(), (byte) 0); + buffers.get(i).clear(); + } + } + + /** + * Checks whether there are more elements to return. This is usually used for a sequential scan. + */ @Override - public boolean hasNext() { - return currentElementIx < numElements; + public boolean doHasNext() { + return currentElementIxForScan < numElements; } + /** + * Returns the next element. + */ @Override - public void next() { - if (currentOff + 2 * elementSize > bufferCache.getPageSize()) { - currentPageIx++; - currentOff = 0; + public void doNext() throws HyracksDataException { + if (currentOffsetForScan + 2 * elementSize > bufferCache.getPageSize()) { + currentPageIxForScan++; + currentOffsetForScan = 0; } else { - currentOff += elementSize; + currentOffsetForScan += elementSize; + } + + // Needs to read the next block? + if (currentElementIxForScan > bufferEndElementIx && endPageId > bufferEndPageId) { + loadPages(); + currentOffsetForScan = 0; + } + + currentElementIxForScan++; + + tuple.reset(buffers.get(currentPageIxForScan).array(), currentOffsetForScan); + } + + /** + * Prepares buffers to load pages. This method should not be called during the open() + * since it tries to allocate all available frames. If there are multiple concurrently opened + * cursors (e.g., a partitioned inverted index), this will cause an issue. An assumption of this cursor is + * that no two cursors are accessed at the same time even though they can be opened together. + */ + @Override + public void prepareLoadPages() throws HyracksDataException { + // Resets the buffers if there is any. + clearBuffers(); + if (numPages > buffers.size()) { + allocateBuffers(); } - currentElementIx++; - tuple.reset(pages[currentPageIx].getBuffer().array(), currentOff); } + /** + * Reads a part of the inverted list into the working memory via the buffer cache. + * This method reads the inverted list until it fills the current buffers. + */ @Override - public void pinPages() throws HyracksDataException { - if (pinned) { + public void loadPages() throws HyracksDataException { + // Conducts a load. Based on the size of the buffers, it may be possible to read the entire list. + // Resets the start page ID to load. At this moment, the variable bufferEndPageId holds + // the last page ID where the previous loadPages() stopped. + bufferStartPageId = bufferEndPageId + 1; + if (bufferStartPageId > endPageId) { return; } - int pix = 0; - for (int i = startPageId; i <= endPageId; i++) { - pages[pix] = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i), false); - pix++; + int currentBufferIdx = 0; + ByteBuffer tmpBuffer; + for (int i = bufferStartPageId; i <= endPageId; i++) { + page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i), false); + // Copies the content to the buffer (working memory). + // Assumption: processing inverted list takes time; so, we don't want to keep them on the buffer cache. + // Rather, we utilize the assigned working memory (buffers). + tmpBuffer = page.getBuffer(); + tmpBuffer.rewind(); + buffers.get(currentBufferIdx).rewind(); + buffers.get(currentBufferIdx).put(tmpBuffer); + + currentBufferIdx++; + bufferCache.unpin(page); + bufferEndPageId = i; + + // Buffer full? + if (currentBufferIdx >= buffers.size()) { + break; + } } - pinned = true; + setBlockInfo(); } - @Override - public void unpinPages() throws HyracksDataException { - int numPages = endPageId - startPageId + 1; - for (int i = 0; i < numPages; i++) { - bufferCache.unpin(pages[i]); + /** + * Updates the information about this block. + */ + private void setBlockInfo() { + bufferNumLoadedPages = bufferEndPageId - bufferStartPageId + 1; + bufferStartElementIx = + bufferStartPageId == startPageId ? 0 : elementIndexes[bufferStartPageId - startPageId - 1] + 1; + lastRandomSearchedElementIx = bufferStartElementIx; + bufferEndElementIx = elementIndexes[bufferEndPageId - startPageId]; + // Gets the final element tuple in this block. + getElementAtIndex(bufferEndElementIx, bufferEndElementTuple); + currentPageIxForScan = 0; + currentOffsetForScan = bufferStartElementIx == 0 ? startOff - elementSize : -elementSize; + if (bufferEndPageId == endPageId) { + moreBlocksToRead = false; } - pinned = false; } - private void positionCursor(int elementIx) { - int numPages = endPageId - startPageId + 1; + /** + * Unloads the pages from the buffers (working memory). This will release all buffers. + */ + @Override + public void unloadPages() throws HyracksDataException { + // Deallocates the buffer pages + deallocateBuffers(); + } - currentPageIx = binarySearch(elementIndexes, 0, numPages, elementIx); + /** + * Checks whether the search tuple is greater than the last element in the current block of the cursor. + * If so, the cursor needs to load next block of the inverted list. + * + * @param searchTuple + * @param invListCmp + * @return true if the search tuple is greater than the last element in the current block of the cursor + * false if the search tuple is equal to or less than the last element in the current block of the cursor + * @throws HyracksDataException + */ + private boolean needToReadNextBlock(ITupleReference searchTuple, MultiComparator invListCmp) + throws HyracksDataException { + if (moreBlocksToRead && invListCmp.compare(searchTuple, bufferEndElementTuple) > 0) { + return true; + } + return false; + } + + /** + * Gets the tuple for the given element index. + */ + private void getElementAtIndex(int elementIx, FixedSizeTupleReference tuple) { + int currentPageIx = + binarySearch(elementIndexes, bufferStartPageId - startPageId, bufferNumLoadedPages, elementIx); if (currentPageIx < 0) { throw new IndexOutOfBoundsException( "Requested index: " + elementIx + " from array with numElements: " + numElements); } + int currentOff; if (currentPageIx == 0) { currentOff = startOff + elementIx * elementSize; } else { int relativeElementIx = elementIx - elementIndexes[currentPageIx - 1] - 1; currentOff = relativeElementIx * elementSize; } - - currentElementIx = elementIx; - tuple.reset(pages[currentPageIx].getBuffer().array(), currentOff); + // Gets the actual index in the buffers since buffers.size() can be smaller than the total number of pages. + int bufferIdx = currentPageIx % buffers.size(); + tuple.reset(buffers.get(bufferIdx).array(), currentOff); } + /** + * Checks whether the given tuple exists on this inverted list. This method is used when doing a random traversal. + */ @Override public boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp) throws HyracksDataException { - int mid; - int begin = 0; - int end = numElements - 1; + // If the given element is greater than the last element in the current buffer, reads the next block. + if (needToReadNextBlock(searchTuple, invListCmp)) { + loadPages(); + } + int mid = -1; + int begin = lastRandomSearchedElementIx; + int end = bufferEndElementIx; while (begin <= end) { mid = (begin + end) / 2; - positionCursor(mid); + getElementAtIndex(mid, tuple); int cmp = invListCmp.compare(searchTuple, tuple); if (cmp < 0) { end = mid - 1; } else if (cmp > 0) { begin = mid + 1; } else { + lastRandomSearchedElementIx = mid; return true; } } + lastRandomSearchedElementIx = mid; return false; } + /** + * Opens the cursor for the given inverted list. After this open() call, prepreLoadPages() should be called + * before loadPages() are called. For more details, check prepapreLoadPages(). + */ @Override - public void reset(int startPageId, int endPageId, int startOff, int numElements) { + protected void setInvListInfo(int startPageId, int endPageId, int startOff, int numElements) + throws HyracksDataException { this.startPageId = startPageId; this.endPageId = endPageId; this.startOff = startOff; this.numElements = numElements; - this.currentElementIx = 0; - this.currentPageIx = 0; - this.currentOff = startOff - elementSize; - - int numPages = endPageId - startPageId + 1; - if (numPages > pages.length) { - pages = new ICachedPage[endPageId - startPageId + 1]; - elementIndexes = new int[endPageId - startPageId + 1]; + this.currentElementIxForScan = 0; + this.currentPageIxForScan = 0; + this.currentOffsetForScan = startOff - elementSize; + this.bufferStartPageId = startPageId; + // Deducts 1 since the startPage would be set to bufferEndPageId + 1 in loadPages(). + this.bufferEndPageId = startPageId - 1; + this.moreBlocksToRead = true; + this.numPages = endPageId - startPageId + 1; + + if (numPages > elementIndexes.length) { + elementIndexes = new int[numPages]; + } + + for (ByteBuffer buffer : buffers) { + buffer.clear(); } - // fill elementIndexes + // Fills the last element index per page. // first page int cumulElements = (bufferCache.getPageSize() - startOff) / elementSize; + // Deducts 1 because this is the index, not the number of elements. elementIndexes[0] = cumulElements - 1; // middle, full pages @@ -176,19 +375,23 @@ public class FixedSizeElementInvertedListCursor implements IInvertedListCursor { } // last page + // Deducts 1 because this is the index, not the number of elements. elementIndexes[numPages - 1] = numElements - 1; } + /** + * Prints the contents of the current inverted list (a debugging method). + */ @SuppressWarnings("rawtypes") @Override public String printInvList(ISerializerDeserializer[] serdes) throws HyracksDataException { - int oldCurrentOff = currentOff; - int oldCurrentPageId = currentPageIx; - int oldCurrentElementIx = currentElementIx; + int oldCurrentOff = currentOffsetForScan; + int oldCurrentPageId = currentPageIxForScan; + int oldCurrentElementIx = currentElementIxForScan; - currentOff = startOff - elementSize; - currentPageIx = 0; - currentElementIx = 0; + currentOffsetForScan = startOff - elementSize; + currentPageIxForScan = 0; + currentElementIxForScan = 0; StringBuilder strBuilder = new StringBuilder(); @@ -208,13 +411,16 @@ public class FixedSizeElementInvertedListCursor implements IInvertedListCursor { } // reset previous state - currentOff = oldCurrentOff; - currentPageIx = oldCurrentPageId; - currentElementIx = oldCurrentElementIx; + currentOffsetForScan = oldCurrentOff; + currentPageIxForScan = oldCurrentPageId; + currentElementIxForScan = oldCurrentElementIx; return strBuilder.toString(); } + /** + * Prints the current element (a debugging method). + */ @Override @SuppressWarnings("rawtypes") public String printCurrentElement(ISerializerDeserializer[] serdes) throws HyracksDataException { @@ -232,6 +438,9 @@ public class FixedSizeElementInvertedListCursor implements IInvertedListCursor { return strBuilder.toString(); } + /** + * Conducts a binary search to get the index of the given key. + */ private int binarySearch(int[] arr, int arrStart, int arrLength, int key) { int mid; int begin = arrStart; @@ -259,8 +468,11 @@ public class FixedSizeElementInvertedListCursor implements IInvertedListCursor { } } + /** + * A compare function that is used to sort inverted list cursors + */ @Override - public int compareTo(IInvertedListCursor invListCursor) { + public int compareTo(InvertedListCursor invListCursor) { try { return numElements - invListCursor.size(); } catch (HyracksDataException hde) { @@ -268,36 +480,40 @@ public class FixedSizeElementInvertedListCursor implements IInvertedListCursor { } } - @Override - public int getEndPageId() { - return endPageId; - } - + /** + * Gets the cardinality of the current inverted list. + */ @Override public int size() { return numElements; } + /** + * Gets the current tuple. + */ @Override - public int getStartOff() { - return startOff; + public ITupleReference doGetTuple() { + return tuple; } + /** + * Closes the cursor. + */ @Override - public int getStartPageId() { - return startPageId; - } - - public int getOffset() { - return currentOff; - } - - public ICachedPage getPage() { - return pages[currentPageIx]; + public void doClose() throws HyracksDataException { + if (!buffers.isEmpty()) { + unloadPages(); + } } + /** + * Destroys the cursor. + */ @Override - public ITupleReference getTuple() { - return tuple; + public void doDestroy() throws HyracksDataException { + if (!buffers.isEmpty()) { + unloadPages(); + } } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListScanCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListScanCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListScanCursor.java new file mode 100644 index 0000000..ca0f40b --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListScanCursor.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk; + +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; +import org.apache.hyracks.storage.common.MultiComparator; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.file.BufferedFileHandle; + +/** + * A simple scan cursor that only reads a frame by frame from the inverted list. This cursor does not + * conduct a binary search. It only supports the scan operation. The main purpose of this cursor is + * doing a full-scan of an inverted list during a storage-component-merge process. + */ +public class FixedSizeElementInvertedListScanCursor extends InvertedListCursor { + + protected final IBufferCache bufferCache; + protected final int fileId; + protected final int elementSize; + protected int currentElementIxForScan; + protected int currentOffsetForScan; + protected int currentPageId; + + protected int startPageId; + protected int endPageId; + protected int startOff; + protected int numElements; + protected int numPages; + + protected final FixedSizeTupleReference tuple; + protected ICachedPage page; + + protected boolean pinned; + + public FixedSizeElementInvertedListScanCursor(IBufferCache bufferCache, int fileId, ITypeTraits[] invListFields) + throws HyracksDataException { + this.bufferCache = bufferCache; + this.fileId = fileId; + int tmpSize = 0; + for (int i = 0; i < invListFields.length; i++) { + tmpSize += invListFields[i].getFixedLength(); + } + elementSize = tmpSize; + this.currentElementIxForScan = 0; + this.currentOffsetForScan = -elementSize; + this.currentPageId = 0; + this.startPageId = 0; + this.endPageId = 0; + this.startOff = 0; + this.numElements = 0; + this.numPages = 0; + this.tuple = new FixedSizeTupleReference(invListFields); + this.pinned = false; + } + + @Override + public boolean doHasNext() { + return currentElementIxForScan < numElements; + } + + @Override + public void doNext() throws HyracksDataException { + if (currentOffsetForScan + 2 * elementSize > bufferCache.getPageSize()) { + // Read the next page. + currentOffsetForScan = 0; + loadPages(); + } else { + currentOffsetForScan += elementSize; + } + currentElementIxForScan++; + tuple.reset(page.getBuffer().array(), currentOffsetForScan); + } + + @Override + public void prepareLoadPages() throws HyracksDataException { + // No-op for this cursor since it only loads one page to the buffer cache at a time. + } + + /** + * Loads one page from the inverted list into the buffer cache. + */ + @Override + public void loadPages() throws HyracksDataException { + if (pinned) { + unloadPages(); + } + if (currentPageId == endPageId) { + return; + } + currentPageId++; + page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false); + page.acquireReadLatch(); + pinned = true; + } + + @Override + public void unloadPages() throws HyracksDataException { + if (pinned) { + page.releaseReadLatch(); + bufferCache.unpin(page); + pinned = false; + } + } + + @Override + protected void setInvListInfo(int startPageId, int endPageId, int startOff, int numElements) + throws HyracksDataException { + this.startPageId = startPageId; + this.endPageId = endPageId; + this.startOff = startOff; + this.numElements = numElements; + this.currentElementIxForScan = 0; + this.currentOffsetForScan = startOff - elementSize; + // Deducts 1 since the startPage would be set to bufferCurrentPageId + 1 in loadPages(). + this.currentPageId = startPageId - 1; + this.numPages = endPageId - startPageId + 1; + this.pinned = false; + } + + @Override + public int compareTo(InvertedListCursor invListCursor) { + try { + return numElements - invListCursor.size(); + } catch (HyracksDataException hde) { + throw new IllegalStateException(hde); + } + } + + @Override + public int size() { + return numElements; + } + + @Override + public ITupleReference doGetTuple() { + return tuple; + } + + @Override + public void doClose() throws HyracksDataException { + if (pinned) { + unloadPages(); + } + } + + @Override + public void doDestroy() throws HyracksDataException { + if (pinned) { + unloadPages(); + } + } + + @Override + public boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp) throws HyracksDataException { + // This method is designed for a random search. + return false; + } + + @Override + public String printInvList(ISerializerDeserializer[] serdes) throws HyracksDataException { + return null; + } + + @Override + public String printCurrentElement(ISerializerDeserializer[] serdes) throws HyracksDataException { + return null; + } + +}