maedhroz commented on code in PR #2540:
URL: https://github.com/apache/cassandra/pull/2540#discussion_r1289357800
##########
src/java/org/apache/cassandra/index/sai/disk/v1/sortedterms/SortedTermsReader.java:
##########
@@ -112,123 +108,259 @@ public SortedTermsReader(@Nonnull FileHandle
termsDataFileHandle,
public class Cursor implements AutoCloseable
{
private final IndexInputReader termsInput;
+ private final int blockShift;
+ private final int blockMask;
private final long termsDataFp;
private final LongArray blockOffsets;
// The term the cursor currently points to. Initially empty.
private final BytesRef currentTerm;
- // The point id the cursor currently points to. BEFORE_START means
before the first item.
- private long pointId = BEFORE_START;
+ private final BytesRef nextBlockTerm;
+
+ // The point id the cursor currently points to.
+ private long currentPointId;
+ private long currentBlockIndex;
Cursor(FileHandle termsFile, LongArray.Factory blockOffsetsFactory)
throws IOException
{
this.termsInput = IndexInputReader.create(termsFile);
SAICodecUtils.validate(this.termsInput);
+ this.blockShift = this.termsInput.readVInt();
+ this.blockMask = (1 << this.blockShift) - 1;
this.termsDataFp = this.termsInput.getFilePointer();
this.blockOffsets = new
LongArray.DeferredLongArray(blockOffsetsFactory::open);
this.currentTerm = new BytesRef(meta.maxTermLength);
+ this.nextBlockTerm = new BytesRef(meta.maxTermLength);
+ termsInput.seek(termsDataFp);
+ readTerm(currentPointId, currentTerm);
}
/**
- * Returns the current position of the cursor.
- * Initially, before the first call to {@link #advance}, the cursor is
positioned at -1.
- * After reading all the items, the cursor is positioned at index one
- * greater than the position of the last item.
+ * Positions the cursor on the target point id and reads the term at
target to the current term buffer.
+ * <p>
+ * It is allowed to position the cursor before the first item or after
the last item;
+ * in these cases the internal buffer is cleared.
+ *
+ * @param nextPointId point id to lookup
+ * @return The {@link ByteComparable} containing the term
+ * @throws IndexOutOfBoundsException if the target point id is less
than -1 or greater than the number of terms
*/
- public long pointId()
+ public @Nonnull ByteComparable seekForwardToPointId(long nextPointId)
{
- return pointId;
- }
+ if (nextPointId < 0 || nextPointId > meta.termCount)
+ throw new IndexOutOfBoundsException(String.format("The target
point id [%s] cannot be less than 0 or " +
+ "greater
than the term count [%s]", nextPointId, meta.termCount));
+ assert nextPointId >= currentPointId : "Attempt to seek backwards
in seekForwardsToPointId. Next pointId was "
+ + nextPointId + " while
current pointId is " + currentPointId;
+ if (nextPointId != currentPointId)
+ {
+ long blockIndex = nextPointId >>> blockShift;
+ if (blockIndex != currentBlockIndex)
+ {
+ currentBlockIndex = blockIndex;
+ resetPosition();
+ }
+ }
+ while (currentPointId < nextPointId)
+ {
+ readTerm(++currentPointId, currentTerm);
+ currentBlockIndex = currentPointId >>> blockShift;
+ }
- /**
- * Returns the current term data as {@link ByteComparable} referencing
the internal term buffer.
- * The term data stored behind that reference is valid only until the
next call to
- * {@link #advance} or {@link #seekToPointId(long)}.
- */
- public @Nonnull ByteComparable term()
- {
return ByteComparable.fixedLength(currentTerm.bytes,
currentTerm.offset, currentTerm.length);
}
/**
- * Positions the cursor on the target point id and reads the term at
target to the current term buffer.
- * <p>
- * It is allowed to position the cursor before the first item or after
the last item;
- * in these cases the internal buffer is cleared.
+ * Finds the pointId for a term within a range of pointIds. The start
and end of the range must not
+ * exceed the number of terms available.
* <p>
- * This method has constant complexity.
- *
- * @param pointId point id to lookup
- * @throws IOException if a seek and read from the terms file fails
- * @throws IndexOutOfBoundsException if the target point id is less
than -1 or greater than the number of terms
+ * If the term is not in the block containing the start of the range a
binary search is done to find
+ * the block containing the search. That block is then searched to
return the pointId that corresponds
+ * to the term that either equal to or next highest to the term.
*/
- public void seekToPointId(long pointId) throws IOException
+ public long partitionedSeekToTerm(ByteComparable term, long
startingPointId, long endingPointId)
{
- if (pointId < 0 || pointId > meta.termCount)
- throw new IndexOutOfBoundsException(String.format("The target
point id [%s] cannot be less than 0 or " +
- "greater
than the term count [%s]", pointId, meta.termCount));
- long blockIndex = pointId >>> TERMS_DICT_BLOCK_SHIFT;
- long blockAddress = blockOffsets.get(blockIndex);
- termsInput.seek(blockAddress + termsDataFp);
- this.pointId = (blockIndex << TERMS_DICT_BLOCK_SHIFT) - 1;
- while (this.pointId < pointId && advance());
+ BytesRef skipTerm = readBytes(term);
+
+ currentBlockIndex = startingPointId >>> blockShift;
+ resetPosition();
+
+ if (compareTerms(currentTerm, skipTerm) == 0)
+ return startingPointId;
+
+ if (notInCurrentBlock(startingPointId, skipTerm))
+ {
+ long split = (endingPointId - startingPointId) >>> blockShift;
+ long splitPointId = startingPointId;
+ while (split > 0)
+ {
+ currentBlockIndex = Math.min((splitPointId >>> blockShift)
+ split, blockOffsets.length() - 1);
+ resetPosition();
+
+ if (currentPointId >= endingPointId)
+ {
+ currentBlockIndex = (endingPointId - 1) >>> blockShift;
+ resetPosition();
+ }
+
+ int cmp = compareTerms(currentTerm, skipTerm);
+
+ if (cmp == 0)
+ return currentPointId;
+
+ if (cmp < 0)
+ splitPointId = currentPointId;
+
+ split /= 2;
+ }
+ // After we finish the binary search we need to move the block
back till we hit a block that has
+ // a starting term that is less than or equals to the skip term
+ while (currentBlockIndex > 0 && compareTerms(currentTerm,
skipTerm) > 0)
+ {
+ currentBlockIndex--;
+ resetPosition();
+ }
+ }
+
+ // Depending on where we are in the block we may need to move
forwards to the starting point ID
+ while (currentPointId < startingPointId)
+ {
+ currentPointId++;
+ readTerm(currentPointId, currentTerm);
+ currentBlockIndex = currentPointId >>> blockShift;
+ }
+
+ // Move forward to the ending point ID, returning the point ID if
we find our term
+ while (currentPointId < endingPointId)
+ {
+ if (compareTerms(currentTerm, skipTerm) >= 0)
+ return currentPointId;
+ currentPointId++;
+ if (currentPointId == meta.termCount)
+ return -1;
+ readTerm(currentPointId, currentTerm);
+ currentBlockIndex = currentPointId >>> blockShift;
+ }
+ return endingPointId < meta.termCount ? endingPointId : -1;
+ }
+
+ @VisibleForTesting
+ public void reset() throws IOException
+ {
+ currentPointId = 0;
+ currentBlockIndex = 0;
+ termsInput.seek(termsDataFp);
+ readTerm(currentPointId, currentTerm);
}
+
@Override
public void close()
{
termsInput.close();
}
- /**
- * Advances the cursor to the next term and reads it into the current
term buffer.
- * <p>
- * If there are no more available terms, clears the term buffer and
the cursor's position will point to the
- * one behind the last item.
- * <p>
- * This method has constant time complexity.
- *
- * @return true if the cursor was advanced successfully, false if the
end of file was reached
- * @throws IOException if a read from the terms file fails
- */
- @VisibleForTesting
- protected boolean advance() throws IOException
+ private boolean notInCurrentBlock(long pointId, BytesRef term)
{
- if (pointId >= meta.termCount || ++pointId >= meta.termCount)
- {
- currentTerm.length = 0;
+ if (inLastBlock(pointId) || !peekNextBlock(pointId))
return false;
- }
- int prefixLength;
- int suffixLength;
- if ((pointId & TERMS_DICT_BLOCK_MASK) == 0L)
+ resetPosition();
+
+ return compareTerms(term, nextBlockTerm) >= 0;
+ }
+
+ private boolean inLastBlock(long pointId)
+ {
+ return pointId >>> blockShift == blockOffsets.length() - 1;
+ }
+
+ // Tries to load the starting value of the next block into
nextBlockTerm. This will return false
+ // if the pointId is in the last block.
+ private boolean peekNextBlock(long pointId)
+ {
+ long blockIndex = (pointId >>> blockShift) + 1;
+
+ if (blockIndex >= blockOffsets.length())
+ return false;
+
+ termsInput.seek(blockOffsets.get(blockIndex) + termsDataFp);
+ readTerm(blockIndex << blockShift, nextBlockTerm);
+
+ return true;
+ }
+
+ // Reset currentPointId and currentTerm to be at the start of the block
+ // pointed to by currentBlockIndex.
+ private void resetPosition()
+ {
+ termsInput.seek(blockOffsets.get(currentBlockIndex) + termsDataFp);
+ currentPointId = currentBlockIndex << blockShift;
+ readTerm(currentPointId, currentTerm);
+ }
+
+ // Read the next term indicated by pointId.
+ //
+ // Note: pointId is only used to determine whether we are at the start
of a block. It is
+ // important that resetPosition is called prior to multiple calls to
readTerm. It is
+ // easy to get out of position.
+ private void readTerm(long pointId, BytesRef term)
+ {
+ try
{
- prefixLength = 0;
- suffixLength = termsInput.readVInt();
+ int prefixLength;
+ int suffixLength;
+ if ((pointId & blockMask) == 0L)
+ {
+ prefixLength = 0;
+ suffixLength = termsInput.readVInt();
+ }
+ else
+ {
+ // Read the prefix and suffix lengths following the
compression mechanism described
+ // in the SortedTermsWriter. If the lengths contained in
the starting byte are less
+ // than the 4 bit maximum then nothing further is read.
Otherwise, the lengths in the
+ // following vints are added.
+ int compressedLengths =
Byte.toUnsignedInt(termsInput.readByte());
+ prefixLength = compressedLengths & 0x0F;
+ suffixLength = compressedLengths >>> 4;
+ if (prefixLength == 15)
+ prefixLength += termsInput.readVInt();
+ if (suffixLength == 15)
+ suffixLength += termsInput.readVInt();
+ }
+
+ assert prefixLength + suffixLength <= meta.maxTermLength;
+ if (prefixLength + suffixLength > 0)
+ {
+ term.length = prefixLength + suffixLength;
+ // The currentTerm is appended to as the suffix for the
current term is
+ // added to the existing prefix.
+ termsInput.readBytes(term.bytes, prefixLength,
suffixLength);
+ }
}
- else
+ catch (IOException e)
{
- // Read the prefix and suffix lengths following the
compression mechanism described
- // in the SortedTermsWriter. If the lengths contained in the
starting byte are less
- // than the 4 bit maximum then nothing further is read.
Otherwise, the lengths in the
- // following vints are added.
- int compressedLengths =
Byte.toUnsignedInt(termsInput.readByte());
- prefixLength = compressedLengths & 0x0F;
- suffixLength = 1 + (compressedLengths >>> 4);
- if (prefixLength == 15)
- prefixLength += termsInput.readVInt();
- if (suffixLength == 16)
- suffixLength += termsInput.readVInt();
+ throw Throwables.cleaned(e);
}
+ }
- assert prefixLength + suffixLength <= meta.maxTermLength;
- currentTerm.length = prefixLength + suffixLength;
- // The currentTerm is appended to as the suffix for the current
term is
- // added to the existing prefix.
- termsInput.readBytes(currentTerm.bytes, prefixLength,
suffixLength);
- return true;
+ private int compareTerms(BytesRef left, BytesRef right)
+ {
+ return FastByteOperations.compareUnsigned(left.bytes, left.offset,
left.offset + left.length,
+ right.bytes,
right.offset, right.offset + right.length);
+ }
+
+ private BytesRef readBytes(ByteComparable source)
Review Comment:
```suggestion
private BytesRef asBytesRef(ByteComparable source)
```
nit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]