maedhroz commented on code in PR #2540:
URL: https://github.com/apache/cassandra/pull/2540#discussion_r1289362084
##########
src/java/org/apache/cassandra/index/sai/disk/v1/sortedterms/SortedTermsReader.java:
##########
@@ -112,123 +108,259 @@ public SortedTermsReader(@Nonnull FileHandle
termsDataFileHandle,
public class Cursor implements AutoCloseable
{
private final IndexInputReader termsInput;
+ private final int blockShift;
+ private final int blockMask;
private final long termsDataFp;
private final LongArray blockOffsets;
// The term the cursor currently points to. Initially empty.
private final BytesRef currentTerm;
- // The point id the cursor currently points to. BEFORE_START means
before the first item.
- private long pointId = BEFORE_START;
+ private final BytesRef nextBlockTerm;
+
+ // The point id the cursor currently points to.
+ private long currentPointId;
+ private long currentBlockIndex;
Cursor(FileHandle termsFile, LongArray.Factory blockOffsetsFactory)
throws IOException
{
this.termsInput = IndexInputReader.create(termsFile);
SAICodecUtils.validate(this.termsInput);
+ this.blockShift = this.termsInput.readVInt();
+ this.blockMask = (1 << this.blockShift) - 1;
this.termsDataFp = this.termsInput.getFilePointer();
this.blockOffsets = new
LongArray.DeferredLongArray(blockOffsetsFactory::open);
this.currentTerm = new BytesRef(meta.maxTermLength);
+ this.nextBlockTerm = new BytesRef(meta.maxTermLength);
+ termsInput.seek(termsDataFp);
+ readTerm(currentPointId, currentTerm);
}
/**
- * Returns the current position of the cursor.
- * Initially, before the first call to {@link #advance}, the cursor is
positioned at -1.
- * After reading all the items, the cursor is positioned at index one
- * greater than the position of the last item.
+ * Positions the cursor on the target point id and reads the term at
target to the current term buffer.
+ * <p>
+ * It is allowed to position the cursor before the first item or after
the last item;
+ * in these cases the internal buffer is cleared.
+ *
+ * @param nextPointId point id to lookup
+ * @return The {@link ByteComparable} containing the term
+ * @throws IndexOutOfBoundsException if the target point id is less
than -1 or greater than the number of terms
*/
- public long pointId()
+ public @Nonnull ByteComparable seekForwardToPointId(long nextPointId)
{
- return pointId;
- }
+ if (nextPointId < 0 || nextPointId > meta.termCount)
+ throw new IndexOutOfBoundsException(String.format("The target
point id [%s] cannot be less than 0 or " +
+ "greater
than the term count [%s]", nextPointId, meta.termCount));
+ assert nextPointId >= currentPointId : "Attempt to seek backwards
in seekForwardsToPointId. Next pointId was "
+ + nextPointId + " while
current pointId is " + currentPointId;
+ if (nextPointId != currentPointId)
+ {
+ long blockIndex = nextPointId >>> blockShift;
+ if (blockIndex != currentBlockIndex)
+ {
+ currentBlockIndex = blockIndex;
+ resetPosition();
+ }
+ }
+ while (currentPointId < nextPointId)
+ {
+ readTerm(++currentPointId, currentTerm);
+ currentBlockIndex = currentPointId >>> blockShift;
+ }
- /**
- * Returns the current term data as {@link ByteComparable} referencing
the internal term buffer.
- * The term data stored behind that reference is valid only until the
next call to
- * {@link #advance} or {@link #seekToPointId(long)}.
- */
- public @Nonnull ByteComparable term()
- {
return ByteComparable.fixedLength(currentTerm.bytes,
currentTerm.offset, currentTerm.length);
}
/**
- * Positions the cursor on the target point id and reads the term at
target to the current term buffer.
- * <p>
- * It is allowed to position the cursor before the first item or after
the last item;
- * in these cases the internal buffer is cleared.
+ * Finds the pointId for a term within a range of pointIds. The start
and end of the range must not
+ * exceed the number of terms available.
* <p>
- * This method has constant complexity.
- *
- * @param pointId point id to lookup
- * @throws IOException if a seek and read from the terms file fails
- * @throws IndexOutOfBoundsException if the target point id is less
than -1 or greater than the number of terms
+ * If the term is not in the block containing the start of the range a
binary search is done to find
+ * the block containing the search. That block is then searched to
return the pointId that corresponds
+ * to the term that either equal to or next highest to the term.
*/
- public void seekToPointId(long pointId) throws IOException
+ public long partitionedSeekToTerm(ByteComparable term, long
startingPointId, long endingPointId)
{
- if (pointId < 0 || pointId > meta.termCount)
- throw new IndexOutOfBoundsException(String.format("The target
point id [%s] cannot be less than 0 or " +
- "greater
than the term count [%s]", pointId, meta.termCount));
- long blockIndex = pointId >>> TERMS_DICT_BLOCK_SHIFT;
- long blockAddress = blockOffsets.get(blockIndex);
- termsInput.seek(blockAddress + termsDataFp);
- this.pointId = (blockIndex << TERMS_DICT_BLOCK_SHIFT) - 1;
- while (this.pointId < pointId && advance());
+ BytesRef skipTerm = readBytes(term);
+
+ currentBlockIndex = startingPointId >>> blockShift;
+ resetPosition();
+
+ if (compareTerms(currentTerm, skipTerm) == 0)
+ return startingPointId;
+
+ if (notInCurrentBlock(startingPointId, skipTerm))
+ {
+ long split = (endingPointId - startingPointId) >>> blockShift;
+ long splitPointId = startingPointId;
+ while (split > 0)
+ {
+ currentBlockIndex = Math.min((splitPointId >>> blockShift)
+ split, blockOffsets.length() - 1);
+ resetPosition();
+
+ if (currentPointId >= endingPointId)
+ {
+ currentBlockIndex = (endingPointId - 1) >>> blockShift;
+ resetPosition();
+ }
+
+ int cmp = compareTerms(currentTerm, skipTerm);
+
+ if (cmp == 0)
+ return currentPointId;
+
+ if (cmp < 0)
+ splitPointId = currentPointId;
+
+ split /= 2;
+ }
+ // After we finish the binary search we need to move the block
back till we hit a block that has
+ // a starting term that is less than or equals to the skip term
+ while (currentBlockIndex > 0 && compareTerms(currentTerm,
skipTerm) > 0)
+ {
+ currentBlockIndex--;
+ resetPosition();
+ }
+ }
+
+ // Depending on where we are in the block we may need to move
forwards to the starting point ID
+ while (currentPointId < startingPointId)
+ {
+ currentPointId++;
+ readTerm(currentPointId, currentTerm);
+ currentBlockIndex = currentPointId >>> blockShift;
+ }
+
+ // Move forward to the ending point ID, returning the point ID if
we find our term
+ while (currentPointId < endingPointId)
+ {
+ if (compareTerms(currentTerm, skipTerm) >= 0)
+ return currentPointId;
+ currentPointId++;
+ if (currentPointId == meta.termCount)
+ return -1;
+ readTerm(currentPointId, currentTerm);
+ currentBlockIndex = currentPointId >>> blockShift;
Review Comment:
nit: We do this in a couple places...might be nice to have an
`updateCurrentBlockIndex()` or an `advanceBlockIndex(pointId)` or something
similar...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]