http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAccessor.java index 79033d2..23854f9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAccessor.java @@ -25,6 +25,11 @@ import org.apache.hyracks.api.comm.FrameHelper; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.dataflow.value.ITypeTraits; +/** + * This is a fixed-size tuple accessor class. + * The frame structure: [4 bytes for minimum Hyracks frame count] [fixed-size tuple 1] ... [fixed-size tuple n] ... + * [4 bytes for the tuple count in a frame] + */ public class FixedSizeFrameTupleAccessor implements IFrameTupleAccessor { private final int frameSize; @@ -82,12 +87,12 @@ public class FixedSizeFrameTupleAccessor implements IFrameTupleAccessor { @Override public int getFieldStartOffset(int tupleIndex, int fIdx) { - return tupleIndex * tupleSize + fieldStartOffsets[fIdx]; + return getTupleStartOffset(tupleIndex) + fieldStartOffsets[fIdx]; } @Override public int getTupleCount() { - return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize)); + return buffer != null ? buffer.getInt(FrameHelper.getTupleCountOffset(frameSize)) : 0; } @Override @@ -97,7 +102,7 @@ public class FixedSizeFrameTupleAccessor implements IFrameTupleAccessor { @Override public int getTupleStartOffset(int tupleIndex) { - return tupleIndex * tupleSize; + return FixedSizeFrameTupleAppender.MINFRAME_COUNT_SIZE + tupleIndex * tupleSize; } @Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAppender.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAppender.java index 5dd23c4..85d8576 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAppender.java @@ -20,13 +20,25 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk; import java.nio.ByteBuffer; +import java.util.Arrays; import org.apache.hyracks.api.comm.FrameHelper; import org.apache.hyracks.api.dataflow.value.ITypeTraits; +/** + * An appender class for an inverted list. Each frame has two integer values at the beginning and at the end. + * The first represents the number of minimum Hyracks frames in a frame. Currently, we use 1 for this value. + * The latter represents the number of tuples in a frame. This design is required since we may need to use + * RunFileWriter and RunFileReader class during the inverted-index-search operation. + */ public class FixedSizeFrameTupleAppender { - private static final int TUPLE_COUNT_SIZE = 4; + // At the end of a frame, an integer value is written to keep the tuple count in this frame. + public static final int TUPLE_COUNT_SIZE = 4; + // At the beginning of a frame, an integer value is written to keep the number of minimum frames in this frame. + // For this class, the frame size is equal to the minimum frame size in Hyracks. + public static final int MINFRAME_COUNT_SIZE = 4; + private final int frameSize; private final int tupleSize; private ByteBuffer buffer; @@ -42,13 +54,22 @@ public class FixedSizeFrameTupleAppender { tupleSize = tmp; } - public void reset(ByteBuffer buffer, boolean clear) { + public void reset(ByteBuffer buffer) { + reset(buffer, true, 0, MINFRAME_COUNT_SIZE); + } + + public void reset(ByteBuffer buffer, boolean clear, int tupleCount, int tupleDataEndOffset) { this.buffer = buffer; if (clear) { - buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), 0); - tupleCount = 0; - tupleDataEndOffset = 0; + Arrays.fill(this.buffer.array(), (byte) 0); + this.buffer.clear(); + // the number of minimum frames in a frame - it's one. + FrameHelper.serializeFrameSize(this.buffer, 1); } + // tuple count + this.buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount); + this.tupleCount = tupleCount; + this.tupleDataEndOffset = tupleDataEndOffset; } public boolean append(byte[] bytes, int offset) { @@ -128,4 +149,5 @@ public class FixedSizeFrameTupleAppender { public ByteBuffer getBuffer() { return buffer; } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java index 8e8cb13..2f4f1d6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java @@ -21,15 +21,14 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk; import java.io.DataOutput; import java.io.IOException; -import java.nio.ByteBuffer; -import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.data.std.primitive.IntegerPointable; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; @@ -48,7 +47,8 @@ import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; +import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexSearchCursorInitialState; import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate; import org.apache.hyracks.storage.am.lsm.invertedindex.search.TOccurrenceSearcher; import org.apache.hyracks.storage.common.IIndexAccessParameters; @@ -70,8 +70,6 @@ import org.apache.hyracks.storage.common.file.BufferedFileHandle; * cannot exceed the size of a Hyracks frame. */ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { - protected final IHyracksCommonContext ctx = new DefaultHyracksCommonContext(); - // Schema of BTree tuples, set in constructor. protected final int invListStartPageIdField; protected final int invListEndPageIdField; @@ -185,12 +183,17 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { } @Override - public IInvertedListCursor createInvertedListCursor() { - return new FixedSizeElementInvertedListCursor(bufferCache, fileId, invListTypeTraits); + public InvertedListCursor createInvertedListCursor(IHyracksTaskContext ctx) throws HyracksDataException { + return new FixedSizeElementInvertedListCursor(bufferCache, fileId, invListTypeTraits, ctx); } @Override - public void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference searchKey, + public InvertedListCursor createInvertedListRangeSearchCursor() throws HyracksDataException { + return new FixedSizeElementInvertedListScanCursor(bufferCache, fileId, invListTypeTraits); + } + + @Override + public void openInvertedListCursor(InvertedListCursor listCursor, ITupleReference searchKey, IIndexOperationContext ictx) throws HyracksDataException { OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx; ctx.getBtreePred().setLowKeyComparator(ctx.getSearchCmp()); @@ -201,16 +204,19 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { try { if (ctx.getBtreeCursor().hasNext()) { ctx.getBtreeCursor().next(); - resetInvertedListCursor(ctx.getBtreeCursor().getTuple(), listCursor); + openInvertedListCursor(ctx.getBtreeCursor().getTuple(), listCursor); } else { - listCursor.reset(0, 0, 0, 0); + LSMInvertedIndexSearchCursorInitialState initState = new LSMInvertedIndexSearchCursorInitialState(); + initState.setInvertedListInfo(0, 0, 0, 0); + listCursor.open(initState, null); } } finally { ctx.getBtreeCursor().close(); } } - public void resetInvertedListCursor(ITupleReference btreeTuple, IInvertedListCursor listCursor) { + public void openInvertedListCursor(ITupleReference btreeTuple, InvertedListCursor listCursor) + throws HyracksDataException { int startPageId = IntegerPointable.getInteger(btreeTuple.getFieldData(invListStartPageIdField), btreeTuple.getFieldStart(invListStartPageIdField)); int endPageId = IntegerPointable.getInteger(btreeTuple.getFieldData(invListEndPageIdField), @@ -219,7 +225,9 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { btreeTuple.getFieldStart(invListStartOffField)); int numElements = IntegerPointable.getInteger(btreeTuple.getFieldData(invListNumElementsField), btreeTuple.getFieldStart(invListNumElementsField)); - listCursor.reset(startPageId, endPageId, startOff, numElements); + LSMInvertedIndexSearchCursorInitialState initState = new LSMInvertedIndexSearchCursorInitialState(); + initState.setInvertedListInfo(startPageId, endPageId, startOff, numElements); + listCursor.open(initState, null); } public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader { @@ -416,45 +424,48 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { } public class OnDiskInvertedIndexAccessor implements IInvertedIndexAccessor { - private final OnDiskInvertedIndex index; - private final IInvertedIndexSearcher searcher; - private final IIndexOperationContext opCtx = new OnDiskInvertedIndexOpContext(btree); + protected final OnDiskInvertedIndex index; + protected final IIndexOperationContext opCtx; + protected final IHyracksTaskContext ctx; + protected IInvertedIndexSearcher searcher; private boolean destroyed = false; - public OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index) throws HyracksDataException { - this.index = index; - this.searcher = new TOccurrenceSearcher(ctx, index); - } - - // Let subclasses initialize. - protected OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IInvertedIndexSearcher searcher) { + public OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IHyracksTaskContext ctx) + throws HyracksDataException { this.index = index; - this.searcher = searcher; + this.ctx = ctx; + this.opCtx = new OnDiskInvertedIndexOpContext(btree); } @Override - public IIndexCursor createSearchCursor(boolean exclusive) { - return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length); + public IIndexCursor createSearchCursor(boolean exclusive) throws HyracksDataException { + if (searcher == null) { + searcher = new TOccurrenceSearcher(index, ctx); + } + return new OnDiskInvertedIndexSearchCursor(searcher); } @Override public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException { - searcher.search((OnDiskInvertedIndexSearchCursor) cursor, (InvertedIndexSearchPredicate) searchPred, opCtx); + if (searcher == null) { + searcher = new TOccurrenceSearcher(index, ctx); + } + searcher.search(cursor, (InvertedIndexSearchPredicate) searchPred, opCtx); } @Override - public IInvertedListCursor createInvertedListCursor() { - return index.createInvertedListCursor(); + public InvertedListCursor createInvertedListCursor() throws HyracksDataException { + return index.createInvertedListCursor(ctx); } @Override - public void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference searchKey) + public void openInvertedListCursor(InvertedListCursor listCursor, ITupleReference searchKey) throws HyracksDataException { index.openInvertedListCursor(listCursor, searchKey, opCtx); } @Override - public IIndexCursor createRangeSearchCursor() { + public IIndexCursor createRangeSearchCursor() throws HyracksDataException { return new OnDiskInvertedIndexRangeSearchCursor(index, opCtx); } @@ -496,47 +507,8 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { @Override public OnDiskInvertedIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException { - return new OnDiskInvertedIndexAccessor(this); - } - - // This is just a dummy hyracks context for allocating frames for temporary - // results during inverted index searches. - // TODO: In the future we should use the real HyracksTaskContext to track - // frame usage. - public static class DefaultHyracksCommonContext implements IHyracksCommonContext { - private final int FRAME_SIZE = 32768; - - @Override - public int getInitialFrameSize() { - return FRAME_SIZE; - } - - @Override - public IIOManager getIoManager() { - return null; - } - - @Override - public ByteBuffer allocateFrame() { - return ByteBuffer.allocate(FRAME_SIZE); - } - - @Override - public ByteBuffer allocateFrame(int bytes) throws HyracksDataException { - return ByteBuffer.allocate(bytes); - } - - @Override - public ByteBuffer reallocateFrame(ByteBuffer bytes, int newSizeInBytes, boolean copyOldData) - throws HyracksDataException { - throw new HyracksDataException("TODO"); - } - - @Override - public void deallocateFrames(int bytes) { - // TODO Auto-generated method stub - - } + return new OnDiskInvertedIndexAccessor(this, + (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT)); } @Override @@ -576,19 +548,21 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { fieldPermutation[i] = i; } PermutingTupleReference tokenTuple = new PermutingTupleReference(fieldPermutation); + IIndexOperationContext opCtx = new OnDiskInvertedIndexOpContext(btree); // Search key for finding an inverted-list in the actual index. ArrayTupleBuilder prevBuilder = new ArrayTupleBuilder(invListTypeTraits.length); ArrayTupleReference prevTuple = new ArrayTupleReference(); IInvertedIndexAccessor invIndexAccessor = createAccessor(NoOpIndexAccessParameters.INSTANCE); try { - IInvertedListCursor invListCursor = invIndexAccessor.createInvertedListCursor(); + InvertedListCursor invListCursor = createInvertedListRangeSearchCursor(); MultiComparator invListCmp = MultiComparator.create(invListCmpFactories); while (btreeCursor.hasNext()) { btreeCursor.next(); tokenTuple.reset(btreeCursor.getTuple()); // Validate inverted list by checking that the elements are totally ordered. - invIndexAccessor.openInvertedListCursor(invListCursor, tokenTuple); - invListCursor.pinPages(); + openInvertedListCursor(invListCursor, tokenTuple, opCtx); + invListCursor.prepareLoadPages(); + invListCursor.loadPages(); try { if (invListCursor.hasNext()) { invListCursor.next(); @@ -607,7 +581,8 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { prevTuple.reset(prevBuilder.getFieldEndOffsets(), prevBuilder.getByteArray()); } } finally { - invListCursor.unpinPages(); + invListCursor.unloadPages(); + invListCursor.close(); } } } finally { @@ -664,4 +639,5 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { bufferCache.purgeHandle(fileId); fileId = -1; } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java index 89d4e9a..267cc79 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java @@ -39,7 +39,7 @@ public class OnDiskInvertedIndexOpContext implements IIndexOperationContext { private MultiComparator prefixSearchCmp; private boolean destroyed = false; - public OnDiskInvertedIndexOpContext(BTree btree) { + public OnDiskInvertedIndexOpContext(BTree btree) throws HyracksDataException { // TODO: Ignore opcallbacks for now. btreeAccessor = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE); btreeCursor = btreeAccessor.createSearchCursor(false); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java index 7af35ff..a33b045 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java @@ -27,7 +27,7 @@ import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; import org.apache.hyracks.storage.common.EnforcedIndexCursor; import org.apache.hyracks.storage.common.ICursorInitialState; import org.apache.hyracks.storage.common.IIndexAccessor; @@ -43,8 +43,8 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor { private final IIndexAccessor btreeAccessor; private final IInPlaceInvertedIndex invIndex; private final IIndexOperationContext opCtx; - private final IInvertedListCursor invListCursor; - private boolean unpinNeeded; + private final InvertedListCursor invListRangeSearchCursor; + private boolean isInvListCursorOpen; private final IIndexCursor btreeCursor; private RangePredicate btreePred; @@ -52,7 +52,8 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor { private final PermutingTupleReference tokenTuple; private ConcatenatingTupleReference concatTuple; - public OnDiskInvertedIndexRangeSearchCursor(IInPlaceInvertedIndex invIndex, IIndexOperationContext opCtx) { + public OnDiskInvertedIndexRangeSearchCursor(IInPlaceInvertedIndex invIndex, IIndexOperationContext opCtx) + throws HyracksDataException { this.btree = ((OnDiskInvertedIndex) invIndex).getBTree(); this.btreeAccessor = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE); this.invIndex = invIndex; @@ -65,64 +66,59 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor { tokenTuple = new PermutingTupleReference(fieldPermutation); btreeCursor = btreeAccessor.createSearchCursor(false); concatTuple = new ConcatenatingTupleReference(2); - invListCursor = invIndex.createInvertedListCursor(); - unpinNeeded = false; + invListRangeSearchCursor = invIndex.createInvertedListRangeSearchCursor(); + isInvListCursorOpen = false; } @Override public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException { this.btreePred = (RangePredicate) searchPred; btreeAccessor.search(btreeCursor, btreePred); - invListCursor.pinPages(); - unpinNeeded = true; + openInvListRangeSearchCursor(); } @Override public boolean doHasNext() throws HyracksDataException { - if (invListCursor.hasNext()) { - return true; - } - if (unpinNeeded) { - invListCursor.unpinPages(); - unpinNeeded = false; - } - if (!btreeCursor.hasNext()) { + // No more results possible + if (!isInvListCursorOpen) { return false; } - btreeCursor.next(); - tokenTuple.reset(btreeCursor.getTuple()); - invIndex.openInvertedListCursor(invListCursor, tokenTuple, opCtx); - invListCursor.pinPages(); - invListCursor.hasNext(); - unpinNeeded = true; - concatTuple.reset(); - concatTuple.addTuple(tokenTuple); - return true; + if (invListRangeSearchCursor.hasNext()) { + return true; + } + // The current inverted-list-range-search cursor is exhausted. + invListRangeSearchCursor.unloadPages(); + invListRangeSearchCursor.close(); + isInvListCursorOpen = false; + openInvListRangeSearchCursor(); + return isInvListCursorOpen; } @Override public void doNext() throws HyracksDataException { - invListCursor.next(); + invListRangeSearchCursor.next(); if (concatTuple.hasMaxTuples()) { concatTuple.removeLastTuple(); } - concatTuple.addTuple(invListCursor.getTuple()); + concatTuple.addTuple(invListRangeSearchCursor.getTuple()); } @Override public void doDestroy() throws HyracksDataException { - if (unpinNeeded) { - invListCursor.unpinPages(); - unpinNeeded = false; + if (isInvListCursorOpen) { + invListRangeSearchCursor.unloadPages(); + invListRangeSearchCursor.destroy(); + isInvListCursorOpen = false; } btreeCursor.destroy(); } @Override public void doClose() throws HyracksDataException { - if (unpinNeeded) { - invListCursor.unpinPages(); - unpinNeeded = false; + if (isInvListCursorOpen) { + invListRangeSearchCursor.unloadPages(); + invListRangeSearchCursor.close(); + isInvListCursorOpen = false; } btreeCursor.close(); } @@ -131,4 +127,20 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor { public ITupleReference doGetTuple() { return concatTuple; } + + // Opens an inverted-list-scan cursor for the given tuple. + private void openInvListRangeSearchCursor() throws HyracksDataException { + if (btreeCursor.hasNext()) { + btreeCursor.next(); + tokenTuple.reset(btreeCursor.getTuple()); + invIndex.openInvertedListCursor(invListRangeSearchCursor, tokenTuple, opCtx); + invListRangeSearchCursor.prepareLoadPages(); + invListRangeSearchCursor.loadPages(); + concatTuple.reset(); + concatTuple.addTuple(tokenTuple); + isInvListCursorOpen = true; + } else { + isInvListCursorOpen = false; + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java index 0563ec9..4c521fd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java @@ -19,94 +19,55 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk; -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher; import org.apache.hyracks.storage.common.EnforcedIndexCursor; import org.apache.hyracks.storage.common.ICursorInitialState; import org.apache.hyracks.storage.common.ISearchPredicate; +/** + * A search cursor that fetches the result from an IInvertedIndexSearcher instance. + */ public class OnDiskInvertedIndexSearchCursor extends EnforcedIndexCursor { - private List<ByteBuffer> resultBuffers; - private int numResultBuffers; - private int currentBufferIndex = 0; - private int tupleIndex = 0; private final IInvertedIndexSearcher invIndexSearcher; - private final IFrameTupleAccessor fta; - private final FixedSizeTupleReference frameTuple; - private final PermutingTupleReference resultTuple; - public OnDiskInvertedIndexSearchCursor(IInvertedIndexSearcher invIndexSearcher, int numInvListFields) { + public OnDiskInvertedIndexSearchCursor(IInvertedIndexSearcher invIndexSearcher) { this.invIndexSearcher = invIndexSearcher; - this.fta = invIndexSearcher.createResultFrameTupleAccessor(); - this.frameTuple = (FixedSizeTupleReference) invIndexSearcher.createResultFrameTupleReference(); - // Project away the occurrence count from the result tuples. - int[] fieldPermutation = new int[numInvListFields]; - for (int i = 0; i < numInvListFields; i++) { - fieldPermutation[i] = i; - } - resultTuple = new PermutingTupleReference(fieldPermutation); } @Override public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException { - currentBufferIndex = 0; - tupleIndex = 0; - resultBuffers = invIndexSearcher.getResultBuffers(); - numResultBuffers = invIndexSearcher.getNumValidResultBuffers(); - if (numResultBuffers > 0) { - fta.reset(resultBuffers.get(0)); - } + // No-op for this cursor since all necessary information is already set in the given searcher. + // This class is just a wrapper. } @Override - public boolean doHasNext() { - if (currentBufferIndex < numResultBuffers && tupleIndex < fta.getTupleCount()) { - return true; - } else { - return false; - } + public boolean doHasNext() throws HyracksDataException { + return invIndexSearcher.hasNext(); } @Override - public void doNext() { - frameTuple.reset(fta.getBuffer().array(), fta.getTupleStartOffset(tupleIndex)); - resultTuple.reset(frameTuple); - tupleIndex++; - if (tupleIndex >= fta.getTupleCount()) { - if (currentBufferIndex + 1 < numResultBuffers) { - currentBufferIndex++; - fta.reset(resultBuffers.get(currentBufferIndex)); - tupleIndex = 0; - } - } + public void doNext() throws HyracksDataException { + invIndexSearcher.next(); } @Override public ITupleReference doGetTuple() { - return resultTuple; + return invIndexSearcher.getTuple(); } @Override - public void doClose() { - currentBufferIndex = 0; - tupleIndex = 0; - invIndexSearcher.reset(); - resultBuffers = invIndexSearcher.getResultBuffers(); - numResultBuffers = invIndexSearcher.getNumValidResultBuffers(); + public void doClose() throws HyracksDataException { + doDestroy(); } @Override public void doDestroy() throws HyracksDataException { - currentBufferIndex = 0; - tupleIndex = 0; - resultBuffers = null; - numResultBuffers = 0; + if (invIndexSearcher != null) { + invIndexSearcher.destroy(); + } } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java index 064a26d..eff4f5a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java @@ -19,23 +19,26 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk; -import java.util.List; - +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.data.std.primitive.ShortPointable; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; +import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate; import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions; import org.apache.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher; import org.apache.hyracks.storage.common.IIndexAccessParameters; +import org.apache.hyracks.storage.common.IIndexCursor; +import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.buffercache.IBufferCache; public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex implements IPartitionedInvertedIndex { @@ -51,21 +54,39 @@ public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex implemen } public class PartitionedOnDiskInvertedIndexAccessor extends OnDiskInvertedIndexAccessor { - public PartitionedOnDiskInvertedIndexAccessor(OnDiskInvertedIndex index) throws HyracksDataException { - super(index, new PartitionedTOccurrenceSearcher(ctx, index)); + public PartitionedOnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IHyracksTaskContext ctx) + throws HyracksDataException { + super(index, ctx); + } + + @Override + public IIndexCursor createSearchCursor(boolean exclusive) throws HyracksDataException { + if (searcher == null) { + searcher = new PartitionedTOccurrenceSearcher(index, ctx); + } + return new OnDiskInvertedIndexSearchCursor(searcher); + } + + @Override + public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException { + if (searcher == null) { + searcher = new PartitionedTOccurrenceSearcher(index, ctx); + } + searcher.search(cursor, (InvertedIndexSearchPredicate) searchPred, opCtx); } } @Override public PartitionedOnDiskInvertedIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException { - return new PartitionedOnDiskInvertedIndexAccessor(this); + return new PartitionedOnDiskInvertedIndexAccessor(this, + (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT)); } @Override public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx, - short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions, - List<IInvertedListCursor> cursorsOrderedByTokens) throws HyracksDataException { + short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions) + throws HyracksDataException { PartitionedTOccurrenceSearcher partSearcher = (PartitionedTOccurrenceSearcher) searcher; OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx; ITupleReference lowSearchKey = null; @@ -95,9 +116,8 @@ public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex implemen ITupleReference btreeTuple = ctx.getBtreeCursor().getTuple(); short numTokens = ShortPointable.getShort(btreeTuple.getFieldData(PARTITIONING_NUM_TOKENS_FIELD), btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD)); - IInvertedListCursor invListCursor = partSearcher.getCachedInvertedListCursor(); - resetInvertedListCursor(btreeTuple, invListCursor); - cursorsOrderedByTokens.add(invListCursor); + InvertedListCursor invListCursor = partSearcher.getCachedInvertedListCursor(); + openInvertedListCursor(btreeTuple, invListCursor); invListPartitions.addInvertedListCursor(invListCursor, numTokens); tokenExists = true; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java index 294eb04..ff9a2f1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java @@ -25,22 +25,25 @@ import java.util.List; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.data.std.primitive.IntegerPointable; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderAccessor; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; +import org.apache.hyracks.dataflow.common.utils.TaskUtil; +import org.apache.hyracks.dataflow.std.buffermanager.BufferManagerBackedVSizeFrame; +import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor; import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference; import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizer; @@ -57,10 +60,12 @@ public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearc protected final int OBJECT_CACHE_INIT_SIZE = 10; protected final int OBJECT_CACHE_EXPAND_SIZE = 10; - protected final IHyracksCommonContext ctx; + protected final IHyracksTaskContext ctx; protected final InvertedListMerger invListMerger; - protected final SearchResult searchResult; + // Final search result is needed because multiple merge() calls can happen. + // We can't just use one of intermediate results as the final search result. + protected final InvertedIndexFinalSearchResult finalSearchResult; protected final IInPlaceInvertedIndex invIndex; protected final MultiComparator invListCmp; @@ -71,28 +76,51 @@ public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearc protected int occurrenceThreshold; - protected final IObjectFactory<IInvertedListCursor> invListCursorFactory; - protected final ObjectCache<IInvertedListCursor> invListCursorCache; + protected final IObjectFactory<InvertedListCursor> invListCursorFactory; + protected final ObjectCache<InvertedListCursor> invListCursorCache; - public AbstractTOccurrenceSearcher(IHyracksCommonContext ctx, IInPlaceInvertedIndex invIndex) + protected final ISimpleFrameBufferManager bufferManager; + protected boolean isFinishedSearch; + + // For a single inverted list case + protected InvertedListCursor singleInvListCursor; + protected boolean isSingleInvertedList; + + // To read the final search result + protected ByteBuffer searchResultBuffer; + protected int searchResultTupleIndex = 0; + protected final IFrameTupleAccessor searchResultFta; + protected FixedSizeTupleReference searchResultTuple; + + public AbstractTOccurrenceSearcher(IInPlaceInvertedIndex invIndex, IHyracksTaskContext ctx) throws HyracksDataException { - this.ctx = ctx; - this.invListMerger = new InvertedListMerger(ctx, invIndex); - this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx); this.invIndex = invIndex; + this.ctx = ctx; + if (ctx == null) { + throw HyracksDataException.create(ErrorCode.CANNOT_CONTINUE_TEXT_SEARCH_HYRACKS_TASK_IS_NULL); + } + this.bufferManager = TaskUtil.get(HyracksConstants.INVERTED_INDEX_SEARCH_FRAME_MANAGER, ctx); + if (bufferManager == null) { + throw HyracksDataException.create(ErrorCode.CANNOT_CONTINUE_TEXT_SEARCH_BUFFER_MANAGER_IS_NULL); + } + this.finalSearchResult = + new InvertedIndexFinalSearchResult(invIndex.getInvListTypeTraits(), ctx, bufferManager); + this.invListMerger = new InvertedListMerger(ctx, invIndex, bufferManager); this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories()); - this.invListCursorFactory = new InvertedListCursorFactory(invIndex); + this.invListCursorFactory = new InvertedListCursorFactory(invIndex, ctx); this.invListCursorCache = new ObjectCache<>(invListCursorFactory, OBJECT_CACHE_INIT_SIZE, OBJECT_CACHE_EXPAND_SIZE); - this.queryTokenFrame = new VSizeFrame(ctx); + this.queryTokenFrame = new BufferManagerBackedVSizeFrame(ctx, bufferManager); + if (queryTokenFrame.getBuffer() == null) { + throw HyracksDataException.create(ErrorCode.NOT_ENOUGH_BUDGET_FOR_TEXTSEARCH, + this.getClass().getSimpleName()); + } this.queryTokenAppender = new FrameTupleAppenderAccessor(QUERY_TOKEN_REC_DESC); this.queryTokenAppender.reset(queryTokenFrame, true); - } - - @Override - public void reset() { - searchResult.clear(); - invListMerger.reset(); + this.isSingleInvertedList = false; + this.searchResultTuple = new FixedSizeTupleReference(invIndex.getInvListTypeTraits()); + this.searchResultFta = + new FixedSizeFrameTupleAccessor(ctx.getInitialFrameSize(), invIndex.getInvListTypeTraits()); } protected void tokenizeQuery(InvertedIndexSearchPredicate searchPred) throws HyracksDataException { @@ -100,7 +128,7 @@ public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearc int queryFieldIndex = searchPred.getQueryFieldIndex(); IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer(); // Is this a full-text query? - // Then, the last argument is conjuctive or disjunctive search option, not a query text. + // Then, the last argument is conjunctive or disjunctive search option, not a query text. // Thus, we need to remove the last argument. boolean isFullTextSearchQuery = searchPred.getIsFullTextSearchQuery(); // Get the type of query tokenizer. @@ -144,33 +172,13 @@ public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearc } } - @Override - public IFrameTupleAccessor createResultFrameTupleAccessor() { - return new FixedSizeFrameTupleAccessor(ctx.getInitialFrameSize(), searchResult.getTypeTraits()); - } - - @Override - public ITupleReference createResultFrameTupleReference() { - return new FixedSizeTupleReference(searchResult.getTypeTraits()); - } - - @Override - public List<ByteBuffer> getResultBuffers() { - return searchResult.getBuffers(); - } - - @Override - public int getNumValidResultBuffers() { - return searchResult.getCurrentBufferIndex() + 1; - } - public int getOccurrenceThreshold() { return occurrenceThreshold; } public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) { StringBuffer strBuffer = new StringBuffer(); - FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor(); + FixedSizeFrameTupleAccessor resultFrameTupleAcc = finalSearchResult.getAccessor(); for (int i = 0; i <= maxResultBufIdx; i++) { ByteBuffer testBuf = buffer.get(i); resultFrameTupleAcc.reset(testBuf); @@ -183,4 +191,99 @@ public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearc } System.out.println(strBuffer.toString()); } + + /** + * Checks whether underlying the inverted list cursor or final search result has a tuple to return. + */ + @Override + public boolean hasNext() throws HyracksDataException { + do { + boolean moreToRead = hasMoreElement(); + if (moreToRead) { + return true; + } + // Current cursor or buffer is exhausted. Unbinds the inverted list cursor or + // cleans the output buffer of the final search result. + resetResultSource(); + // Search is done? Then, there's nothing left. + if (isFinishedSearch) { + return false; + } + // Otherwise, resume the search process. + continueSearch(); + } while (true); + } + + @Override + public void next() throws HyracksDataException { + // Case 1: fetching a tuple from an inverted list cursor + if (isSingleInvertedList) { + singleInvListCursor.next(); + } else { + // Case 2: fetching a tuple from the output frame of a final search result + searchResultTuple.reset(searchResultFta.getBuffer().array(), + searchResultFta.getTupleStartOffset(searchResultTupleIndex)); + searchResultTupleIndex++; + } + } + + private boolean hasMoreElement() throws HyracksDataException { + // Case #1: single inverted list cursor + if (isSingleInvertedList) { + return singleInvListCursor.hasNext(); + } + // Case #2: ouput buffer from a final search result + return searchResultTupleIndex < searchResultFta.getTupleCount(); + } + + private void resetResultSource() throws HyracksDataException { + if (isSingleInvertedList) { + isSingleInvertedList = false; + singleInvListCursor.unloadPages(); + singleInvListCursor.close(); + singleInvListCursor = null; + } else { + finalSearchResult.resetBuffer(); + searchResultTupleIndex = 0; + } + } + + public void destroy() throws HyracksDataException { + // To ensure to release the buffer of the query token frame. + ((BufferManagerBackedVSizeFrame) queryTokenFrame).destroy(); + + // Releases the frames of the cursor. + if (isSingleInvertedList && singleInvListCursor != null) { + singleInvListCursor.unloadPages(); + singleInvListCursor.close(); + } + // Releases the frame of the final search result. + finalSearchResult.close(); + + // Releases the frames of the two intermediate search result. + invListMerger.close(); + } + + @Override + public ITupleReference getTuple() { + if (isSingleInvertedList) { + return singleInvListCursor.getTuple(); + } + return searchResultTuple; + } + + /** + * Prepares the search process. This mainly allocates/clears the buffer frames of the each component. + */ + protected void prepareSearch() throws HyracksDataException { + finalSearchResult.prepareIOBuffer(); + invListMerger.prepareMerge(); + ((BufferManagerBackedVSizeFrame) queryTokenFrame).acquireFrame(); + isFinishedSearch = false; + isSingleInvertedList = false; + searchResultFta.reset(finalSearchResult.getNextFrame()); + searchResultTupleIndex = 0; + singleInvListCursor = null; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexFinalSearchResult.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexFinalSearchResult.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexFinalSearchResult.java new file mode 100644 index 0000000..55acb33 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexFinalSearchResult.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.invertedindex.search; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager; + +/** + * This is an in-memory based storage for final results of inverted-index searches. + * Only one frame is used at a time. The same frame will be used multiple times. + */ +public class InvertedIndexFinalSearchResult extends InvertedIndexSearchResult { + + public InvertedIndexFinalSearchResult(ITypeTraits[] invListFields, IHyracksTaskContext ctx, + ISimpleFrameBufferManager bufferManager) throws HyracksDataException { + super(invListFields, ctx, bufferManager); + } + + /** + * The final search result only needs to keep the inverted list fields, not its count. + */ + @Override + protected void initTypeTraits(ITypeTraits[] invListFields) { + typeTraits = new ITypeTraits[invListFields.length]; + int tmp = 0; + for (int i = 0; i < invListFields.length; i++) { + typeTraits[i] = invListFields[i]; + tmp += invListFields[i].getFixedLength(); + } + invListElementSize = tmp; + } + + /** + * Prepares the write operation. A result of the final search result will be always in memory. + */ + @Override + public void prepareWrite(int numExpectedPages) throws HyracksDataException { + // Final search result: we will use the ioBuffer and we will not create any file. + // This method can be called multiple times in case of the partitioned T-Occurrence search. + // For those cases, if the write process has already begun, we should not clear the buffer. + isInMemoryOpMode = true; + isFileOpened = false; + resetAppenderLocation(IO_BUFFER_IDX); + isWriteFinished = false; + } + + /** + * Appends an element to the frame of this result. When processing the final list, + * it does not create an additional frame when a frame becomes full to let the caller consume the frame. + * + * @return false if the current frame for the final result is full. + * true otherwise. + */ + @Override + public boolean append(ITupleReference invListElement, int count) throws HyracksDataException { + // Pauses the addition of this tuple if the current page is full. + if (!appender.hasSpace()) { + return false; + } + // Appends the given inverted-list element. + if (!appender.append(invListElement.getFieldData(0), invListElement.getFieldStart(0), invListElementSize)) { + throw HyracksDataException.create(ErrorCode.CANNOT_ADD_ELEMENT_TO_INVERTED_INDEX_SEARCH_RESULT); + } + appender.incrementTupleCount(1); + numResults++; + + return true; + } + + /** + * Finalizes the write operation. + */ + @Override + public void finalizeWrite() throws HyracksDataException { + if (isWriteFinished) { + return; + } + isWriteFinished = true; + } + + /** + * Prepares a read operation. + */ + @Override + public void prepareResultRead() throws HyracksDataException { + if (isInReadMode) { + return; + } + currentReaderBufIdx = 0; + isInReadMode = true; + } + + /** + * Gets the next frame of the current result file. + */ + @Override + public ByteBuffer getNextFrame() throws HyracksDataException { + return buffers.get(IO_BUFFER_IDX); + } + + /** + * Finishes reading the result and frees the buffer. + */ + @Override + public void closeResultRead(boolean deallocateIOBufferNeeded) throws HyracksDataException { + // Deallocates I/O buffer if requested. + if (deallocateIOBufferNeeded) { + deallocateIOBuffer(); + } + } + + /** + * Deallocates the buffer. + */ + @Override + public void close() throws HyracksDataException { + deallocateIOBuffer(); + } + + @Override + public void reset() throws HyracksDataException { + // Resets the I/O buffer. + clearBuffer(ioBuffer); + + searchResultWriter = null; + searchResultReader = null; + isInReadMode = false; + isWriteFinished = false; + isInMemoryOpMode = false; + isFileOpened = false; + currentWriterBufIdx = 0; + currentReaderBufIdx = 0; + numResults = 0; + } + + /** + * Deallocates the I/O buffer (one frame). This should be the last operation. + */ + @Override + protected void deallocateIOBuffer() throws HyracksDataException { + if (ioBufferFrame != null) { + bufferManager.releaseFrame(ioBuffer); + buffers.clear(); + ioBufferFrame = null; + ioBuffer = null; + } + } + + /** + * Resets the buffer. + */ + public void resetBuffer() { + appender.reset(buffers.get(IO_BUFFER_IDX)); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchResult.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchResult.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchResult.java new file mode 100644 index 0000000..527d624 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchResult.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.invertedindex.search; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.ListIterator; + +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.dataflow.common.io.RunFileReader; +import org.apache.hyracks.dataflow.common.io.RunFileWriter; +import org.apache.hyracks.dataflow.std.buffermanager.BufferManagerBackedVSizeFrame; +import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager; +import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor; +import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender; +import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference; + +/** + * Disk-based or in-memory based storage for intermediate and final results of inverted-index + * searches. One frame is dedicated to I/O operation for disk operation mode. + */ +public class InvertedIndexSearchResult { + // The size of count field for each element. Currently, we use an integer value. + protected static final int ELEMENT_COUNT_SIZE = 4; + // I/O buffer's index in the buffers + protected static final int IO_BUFFER_IDX = 0; + protected static final String FILE_PREFIX = "InvertedIndexSearchResult"; + protected final IHyracksTaskContext ctx; + protected final FixedSizeFrameTupleAppender appender; + protected final FixedSizeFrameTupleAccessor accessor; + protected final FixedSizeTupleReference tuple; + protected final ISimpleFrameBufferManager bufferManager; + protected ITypeTraits[] typeTraits; + protected int invListElementSize; + + protected int currentWriterBufIdx; + protected int currentReaderBufIdx; + protected int numResults; + protected int numPossibleElementPerPage; + // Read and Write I/O buffer + protected IFrame ioBufferFrame = null; + protected ByteBuffer ioBuffer = null; + // Buffers for in-memory operation mode. The first buffer is the ioBuffer. + // In case of the final search result, we will use only use the first buffer. No file will be created. + protected ArrayList<ByteBuffer> buffers; + + protected RunFileWriter searchResultWriter; + protected RunFileReader searchResultReader; + protected boolean isInMemoryOpMode; + protected boolean isInReadMode; + protected boolean isWriteFinished; + protected boolean isFileOpened; + + public InvertedIndexSearchResult(ITypeTraits[] invListFields, IHyracksTaskContext ctx, + ISimpleFrameBufferManager bufferManager) throws HyracksDataException { + initTypeTraits(invListFields); + this.ctx = ctx; + appender = new FixedSizeFrameTupleAppender(ctx.getInitialFrameSize(), typeTraits); + accessor = new FixedSizeFrameTupleAccessor(ctx.getInitialFrameSize(), typeTraits); + tuple = new FixedSizeTupleReference(typeTraits); + this.bufferManager = bufferManager; + this.isInReadMode = false; + this.isWriteFinished = false; + this.isInMemoryOpMode = false; + this.isFileOpened = false; + this.ioBufferFrame = null; + this.ioBuffer = null; + this.buffers = null; + this.currentWriterBufIdx = 0; + this.currentReaderBufIdx = 0; + this.numResults = 0; + calculateNumElementPerPage(); + // Allocates one frame for read/write operation. + prepareIOBuffer(); + } + + /** + * Initializes the element type in the search result. In addition to the element, we will keep one more integer + * per element to keep its occurrence count. + */ + protected void initTypeTraits(ITypeTraits[] invListFields) { + typeTraits = new ITypeTraits[invListFields.length + 1]; + int tmp = 0; + for (int i = 0; i < invListFields.length; i++) { + typeTraits[i] = invListFields[i]; + tmp += invListFields[i].getFixedLength(); + } + invListElementSize = tmp; + // Integer for counting occurrences. + typeTraits[invListFields.length] = IntegerPointable.TYPE_TRAITS; + } + + /** + * Prepares the write operation. Tries to allocate buffers for the expected number of pages. + * If that is possible, all operations will be executed in memory. + * If not, all operations will use a file on disk except for the final search result. + * A result of the final search result will be always in memory. + */ + public void prepareWrite(int numExpectedPages) throws HyracksDataException { + if (isInReadMode || isWriteFinished || searchResultWriter != null) { + return; + } + // Intermediate results? disk or in-memory based + // Allocates more buffers. + isInMemoryOpMode = tryAllocateBuffers(numExpectedPages); + if (!isInMemoryOpMode) { + // Not enough number of buffers. Switch to the file I/O mode. + createAndOpenFile(); + } + appender.reset(ioBuffer); + isWriteFinished = false; + } + + /** + * Appends an element and its count to the current frame of this result. The boolean value is necessary for + * the final search result case since the append() of that class is overriding this method. + */ + public boolean append(ITupleReference invListElement, int count) throws HyracksDataException { + ByteBuffer currentBuffer; + // Moves to the next page if the current page is full. + if (!appender.hasSpace()) { + currentWriterBufIdx++; + if (isInMemoryOpMode) { + currentBuffer = buffers.get(currentWriterBufIdx); + } else { + searchResultWriter.nextFrame(ioBuffer); + currentBuffer = ioBuffer; + } + appender.reset(currentBuffer); + } + // Appends inverted-list element. + if (!appender.append(invListElement.getFieldData(0), invListElement.getFieldStart(0), invListElementSize)) { + throw HyracksDataException.create(ErrorCode.CANNOT_ADD_ELEMENT_TO_INVERTED_INDEX_SEARCH_RESULT); + } + // Appends count. + if (!appender.append(count)) { + throw HyracksDataException.create(ErrorCode.CANNOT_ADD_ELEMENT_TO_INVERTED_INDEX_SEARCH_RESULT); + } + appender.incrementTupleCount(1); + numResults++; + + // Always true for the intermediate result. An append should not fail. + return true; + } + + /** + * Finalizes the write operation. After this, no more write operation can be conducted. + */ + public void finalizeWrite() throws HyracksDataException { + if (isWriteFinished) { + return; + } + // For in-memory operation (including the final result), no specific operations are required. + // For disk-based operation, needs to close the writer. + if (!isInMemoryOpMode && searchResultWriter != null) { + searchResultWriter.nextFrame(ioBuffer); + searchResultWriter.close(); + } + isWriteFinished = true; + } + + /** + * Prepares a read operation. + */ + public void prepareResultRead() throws HyracksDataException { + if (isInReadMode) { + return; + } + // No specific operation is required for in-memory mode (including the final result). + if (!isInMemoryOpMode && searchResultWriter != null) { + if (!isWriteFinished) { + finalizeWrite(); + } + searchResultReader = searchResultWriter.createDeleteOnCloseReader(); + searchResultReader.open(); + searchResultReader.setDeleteAfterClose(true); + } + currentReaderBufIdx = 0; + isInReadMode = true; + } + + /** + * Gets the next frame of the current result file. A caller should make sure that initResultRead() is called first. + */ + public ByteBuffer getNextFrame() throws HyracksDataException { + ByteBuffer returnedBuffer = null; + if (isInMemoryOpMode) { + // In-memory mode for an intermediate search result + returnedBuffer = buffers.get(currentReaderBufIdx); + currentReaderBufIdx++; + } else if (searchResultReader != null && searchResultReader.nextFrame(ioBufferFrame)) { + // Disk-based mode for an intermediate search result + returnedBuffer = ioBufferFrame.getBuffer(); + } + return returnedBuffer; + } + + /** + * Finishes reading the result and frees the buffer. + */ + public void closeResultRead(boolean deallocateIOBufferNeeded) throws HyracksDataException { + if (isInMemoryOpMode) { + // In-memory mode? Releases all buffers for an intermediate search result. + deallocateBuffers(); + } else if (searchResultReader != null) { + // Disk mode? Closes the file handle (this should delete the file also.) + searchResultReader.close(); + } + + // Deallocates I/O buffer if requested. + if (deallocateIOBufferNeeded) { + deallocateIOBuffer(); + } + } + + public int getCurrentBufferIndex() { + return currentWriterBufIdx; + } + + public ITypeTraits[] getTypeTraits() { + return typeTraits; + } + + public int getNumResults() { + return numResults; + } + + /** + * Deletes any associated file and deallocates all buffers. + */ + public void close() throws HyracksDataException { + if (isInMemoryOpMode) { + deallocateBuffers(); + } else { + if (searchResultReader != null) { + searchResultReader.close(); + } else if (searchResultWriter != null) { + searchResultWriter.erase(); + } + } + deallocateIOBuffer(); + } + + public void reset() throws HyracksDataException { + // Removes the file if it was in the disk op mode. + if (searchResultReader != null) { + searchResultReader.close(); + } else if (searchResultWriter != null) { + searchResultWriter.erase(); + } else if (buffers.size() > 1) { + // In-memory mode? Deallocates all buffers. + deallocateBuffers(); + } + + // Resets the I/O buffer. + clearBuffer(ioBuffer); + + searchResultWriter = null; + searchResultReader = null; + isInReadMode = false; + isWriteFinished = false; + isInMemoryOpMode = false; + isFileOpened = false; + currentWriterBufIdx = 0; + currentReaderBufIdx = 0; + numResults = 0; + } + + /** + * Gets the expected number of pages if all elements are created as a result. + * An assumption is that there are no common elements between the previous result and the cursor. + */ + public int getExpectedNumPages(int numExpectedElements) { + return (int) Math.ceil((double) numExpectedElements / numPossibleElementPerPage); + } + + // Gets the number of possible elements per page based on the inverted list element size. + protected void calculateNumElementPerPage() { + int frameSize = ctx.getInitialFrameSize(); + // The count of Minframe, and the count of tuples in a frame should be deducted. + frameSize = frameSize - FixedSizeFrameTupleAppender.MINFRAME_COUNT_SIZE + - FixedSizeFrameTupleAppender.TUPLE_COUNT_SIZE; + numPossibleElementPerPage = (int) Math.floor((double) frameSize / (invListElementSize + ELEMENT_COUNT_SIZE)); + } + + /** + * Allocates the buffer for read/write operation and initializes the buffers array that will be used keep a result. + */ + protected void prepareIOBuffer() throws HyracksDataException { + if (ioBufferFrame != null) { + clearBuffer(ioBuffer); + } else { + ioBufferFrame = new BufferManagerBackedVSizeFrame(ctx, bufferManager); + ioBuffer = ioBufferFrame.getBuffer(); + if (ioBuffer == null) { + // One frame should be allocated for conducting read/write + // operation. Otherwise, can't store the result. + throw HyracksDataException.create(ErrorCode.NOT_ENOUGH_BUDGET_FOR_TEXTSEARCH, + this.getClass().getSimpleName()); + } + clearBuffer(ioBuffer); + // For keeping the results in memory if possible. + buffers = new ArrayList<ByteBuffer>(); + buffers.add(ioBuffer); + } + } + + /** + * Tries to allocate buffers to accommodate the results in memory. + */ + protected boolean tryAllocateBuffers(int numExpectedPages) throws HyracksDataException { + boolean allBufferAllocated = true; + while (buffers.size() < numExpectedPages) { + ByteBuffer tmpBuffer = bufferManager.acquireFrame(ctx.getInitialFrameSize()); + if (tmpBuffer == null) { + // Budget exhausted + allBufferAllocated = false; + break; + } else { + clearBuffer(tmpBuffer); + } + buffers.add(tmpBuffer); + } + return allBufferAllocated; + } + + // Creates a file for the writer. + protected void createAndOpenFile() throws HyracksDataException { + if (isInMemoryOpMode) { + // In-memory mode should not generate a file. + return; + } + if (searchResultWriter == null) { + FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(FILE_PREFIX); + searchResultWriter = new RunFileWriter(file, ctx.getIoManager()); + searchResultWriter.open(); + isFileOpened = true; + } + } + + // Deallocates the I/O buffer (one frame). This should be the last oepration. + protected void deallocateIOBuffer() throws HyracksDataException { + if (ioBufferFrame != null) { + bufferManager.releaseFrame(ioBuffer); + buffers.clear(); + ioBufferFrame = null; + ioBuffer = null; + } + } + + /** + * Deallocates the buffers. We do not remove the first buffer since it can be used as an I/O buffer. + */ + protected void deallocateBuffers() throws HyracksDataException { + int toDeleteCount = buffers.size() - 1; + int deletedCount = 0; + for (ListIterator<ByteBuffer> iter = buffers.listIterator(buffers.size()); iter.hasPrevious();) { + if (deletedCount >= toDeleteCount) { + break; + } + ByteBuffer next = iter.previous(); + bufferManager.releaseFrame(next); + iter.remove(); + deletedCount++; + } + } + + public FixedSizeFrameTupleAccessor getAccessor() { + return accessor; + } + + public FixedSizeFrameTupleAppender getAppender() { + return appender; + } + + public FixedSizeTupleReference getTuple() { + return tuple; + } + + protected void clearBuffer(ByteBuffer bufferToClear) { + Arrays.fill(bufferToClear.array(), (byte) 0); + bufferToClear.clear(); + } + + protected void resetAppenderLocation(int bufferIdx) { + accessor.reset(buffers.get(bufferIdx)); + appender.reset(buffers.get(bufferIdx), false, accessor.getTupleCount(), + accessor.getTupleEndOffset(accessor.getTupleCount() - 1)); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java index 8ed80f6..d0337cb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java @@ -19,20 +19,24 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.search; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex; -import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; +import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory; -public class InvertedListCursorFactory implements IObjectFactory<IInvertedListCursor> { +public class InvertedListCursorFactory implements IObjectFactory<InvertedListCursor> { private final IInPlaceInvertedIndex invIndex; + private final IHyracksTaskContext ctx; - public InvertedListCursorFactory(IInPlaceInvertedIndex invIndex) { + public InvertedListCursorFactory(IInPlaceInvertedIndex invIndex, IHyracksTaskContext ctx) { this.invIndex = invIndex; + this.ctx = ctx; } @Override - public IInvertedListCursor create() { - return invIndex.createInvertedListCursor(); + public InvertedListCursor create() throws HyracksDataException { + return invIndex.createInvertedListCursor(ctx); } }