mike-tr-adamson commented on code in PR #2540:
URL: https://github.com/apache/cassandra/pull/2540#discussion_r1290234735


##########
src/java/org/apache/cassandra/index/sai/disk/v1/sortedterms/SortedTermsReader.java:
##########
@@ -112,123 +108,259 @@ public SortedTermsReader(@Nonnull FileHandle 
termsDataFileHandle,
     public class Cursor implements AutoCloseable
     {
         private final IndexInputReader termsInput;
+        private final int blockShift;
+        private final int blockMask;
         private final long termsDataFp;
         private final LongArray blockOffsets;
 
         // The term the cursor currently points to. Initially empty.
         private final BytesRef currentTerm;
 
-        // The point id the cursor currently points to. BEFORE_START means 
before the first item.
-        private long pointId = BEFORE_START;
+        private final BytesRef nextBlockTerm;
+
+        // The point id the cursor currently points to.
+        private long currentPointId;
+        private long currentBlockIndex;
 
         Cursor(FileHandle termsFile, LongArray.Factory blockOffsetsFactory) 
throws IOException
         {
             this.termsInput = IndexInputReader.create(termsFile);
             SAICodecUtils.validate(this.termsInput);
+            this.blockShift = this.termsInput.readVInt();
+            this.blockMask = (1 << this.blockShift) - 1;
             this.termsDataFp = this.termsInput.getFilePointer();
             this.blockOffsets = new 
LongArray.DeferredLongArray(blockOffsetsFactory::open);
             this.currentTerm = new BytesRef(meta.maxTermLength);
+            this.nextBlockTerm = new BytesRef(meta.maxTermLength);
+            termsInput.seek(termsDataFp);
+            readTerm(currentPointId, currentTerm);
         }
 
         /**
-         * Returns the current position of the cursor.
-         * Initially, before the first call to {@link #advance}, the cursor is 
positioned at -1.
-         * After reading all the items, the cursor is positioned at index one
-         * greater than the position of the last item.
+         * Positions the cursor on the target point id and reads the term at 
target to the current term buffer.
+         * <p>
+         * It is allowed to position the cursor before the first item or after 
the last item;
+         * in these cases the internal buffer is cleared.
+         *
+         * @param nextPointId point id to lookup
+         * @return The {@link ByteComparable} containing the term
+         * @throws IndexOutOfBoundsException if the target point id is less 
than -1 or greater than the number of terms
          */
-        public long pointId()
+        public @Nonnull ByteComparable seekForwardToPointId(long nextPointId)
         {
-            return pointId;
-        }
+            if (nextPointId < 0 || nextPointId > meta.termCount)
+                throw new IndexOutOfBoundsException(String.format("The target 
point id [%s] cannot be less than 0 or " +
+                                                                  "greater 
than the term count [%s]", nextPointId, meta.termCount));
+            assert nextPointId >= currentPointId : "Attempt to seek backwards 
in seekForwardsToPointId. Next pointId was "
+                                                   + nextPointId + " while 
current pointId is " + currentPointId;
+            if (nextPointId != currentPointId)
+            {
+                long blockIndex = nextPointId >>> blockShift;
+                if (blockIndex != currentBlockIndex)
+                {
+                    currentBlockIndex = blockIndex;
+                    resetPosition();
+                }
+            }
+            while (currentPointId < nextPointId)
+            {
+                readTerm(++currentPointId, currentTerm);
+                currentBlockIndex = currentPointId >>> blockShift;
+            }
 
-        /**
-         * Returns the current term data as {@link ByteComparable} referencing 
the internal term buffer.
-         * The term data stored behind that reference is valid only until the 
next call to
-         * {@link #advance} or {@link #seekToPointId(long)}.
-         */
-        public @Nonnull ByteComparable term()
-        {
             return ByteComparable.fixedLength(currentTerm.bytes, 
currentTerm.offset, currentTerm.length);
         }
 
         /**
-         * Positions the cursor on the target point id and reads the term at 
target to the current term buffer.
-         * <p>
-         * It is allowed to position the cursor before the first item or after 
the last item;
-         * in these cases the internal buffer is cleared.
+         * Finds the pointId for a term within a range of pointIds. The start 
and end of the range must not
+         * exceed the number of terms available.
          * <p>
-         * This method has constant complexity.
-         *
-         * @param pointId point id to lookup
-         * @throws IOException if a seek and read from the terms file fails
-         * @throws IndexOutOfBoundsException if the target point id is less 
than -1 or greater than the number of terms
+         * If the term is not in the block containing the start of the range a 
binary search is done to find
+         * the block containing the search. That block is then searched to 
return the pointId that corresponds
+         * to the term that either equal to or next highest to the term.
          */
-        public void seekToPointId(long pointId) throws IOException
+        public long partitionedSeekToTerm(ByteComparable term, long 
startingPointId, long endingPointId)
         {
-            if (pointId < 0 || pointId > meta.termCount)
-                throw new IndexOutOfBoundsException(String.format("The target 
point id [%s] cannot be less than 0 or " +
-                                                                  "greater 
than the term count [%s]", pointId, meta.termCount));
-            long blockIndex = pointId >>> TERMS_DICT_BLOCK_SHIFT;
-            long blockAddress = blockOffsets.get(blockIndex);
-            termsInput.seek(blockAddress + termsDataFp);
-            this.pointId = (blockIndex << TERMS_DICT_BLOCK_SHIFT) - 1;
-            while (this.pointId < pointId && advance());
+            BytesRef skipTerm = readBytes(term);
+
+            currentBlockIndex = startingPointId >>> blockShift;
+            resetPosition();
+
+            if (compareTerms(currentTerm, skipTerm) == 0)
+                return startingPointId;
+
+            if (notInCurrentBlock(startingPointId, skipTerm))
+            {
+                long split = (endingPointId - startingPointId) >>> blockShift;
+                long splitPointId = startingPointId;
+                while (split > 0)
+                {
+                    currentBlockIndex = Math.min((splitPointId >>> blockShift) 
+ split, blockOffsets.length() - 1);
+                    resetPosition();
+
+                    if (currentPointId >= endingPointId)
+                    {
+                        currentBlockIndex = (endingPointId - 1) >>> blockShift;
+                        resetPosition();
+                    }
+
+                    int cmp = compareTerms(currentTerm, skipTerm);
+
+                    if (cmp == 0)
+                        return currentPointId;
+
+                    if (cmp < 0)
+                        splitPointId = currentPointId;
+
+                    split /= 2;
+                }
+                // After we finish the binary search we need to move the block 
back till we hit a block that has
+                // a starting term that is less than or equals to the skip term
+                while (currentBlockIndex > 0 && compareTerms(currentTerm, 
skipTerm) > 0)
+                {
+                    currentBlockIndex--;
+                    resetPosition();
+                }
+            }
+
+            // Depending on where we are in the block we may need to move 
forwards to the starting point ID
+            while (currentPointId < startingPointId)
+            {
+                currentPointId++;
+                readTerm(currentPointId, currentTerm);
+                currentBlockIndex = currentPointId >>> blockShift;
+            }
+
+            // Move forward to the ending point ID, returning the point ID if 
we find our term
+            while (currentPointId < endingPointId)
+            {
+                if (compareTerms(currentTerm, skipTerm) >= 0)
+                    return currentPointId;
+                currentPointId++;
+                if (currentPointId == meta.termCount)
+                    return -1;
+                readTerm(currentPointId, currentTerm);
+                currentBlockIndex = currentPointId >>> blockShift;
+            }
+            return endingPointId < meta.termCount ? endingPointId : -1;
+        }
+
+        @VisibleForTesting
+        public void reset() throws IOException
+        {
+            currentPointId = 0;
+            currentBlockIndex = 0;
+            termsInput.seek(termsDataFp);
+            readTerm(currentPointId, currentTerm);
         }
 
+
         @Override
         public void close()
         {
             termsInput.close();
         }
 
-        /**
-         * Advances the cursor to the next term and reads it into the current 
term buffer.
-         * <p>
-         * If there are no more available terms, clears the term buffer and 
the cursor's position will point to the
-         * one behind the last item.
-         * <p>
-         * This method has constant time complexity.
-         *
-         * @return true if the cursor was advanced successfully, false if the 
end of file was reached
-         * @throws IOException if a read from the terms file fails
-         */
-        @VisibleForTesting
-        protected boolean advance() throws IOException
+        private boolean notInCurrentBlock(long pointId, BytesRef term)
         {
-            if (pointId >= meta.termCount || ++pointId >= meta.termCount)
-            {
-                currentTerm.length = 0;
+            if (inLastBlock(pointId) || !peekNextBlock(pointId))
                 return false;
-            }
 
-            int prefixLength;
-            int suffixLength;
-            if ((pointId & TERMS_DICT_BLOCK_MASK) == 0L)
+            resetPosition();
+
+            return compareTerms(term, nextBlockTerm) >= 0;
+        }
+
+        private boolean inLastBlock(long pointId)
+        {
+            return pointId >>> blockShift == blockOffsets.length() - 1;
+        }
+
+        // Tries to load the starting value of the next block into 
nextBlockTerm. This will return false
+        // if the pointId is in the last block.
+        private boolean peekNextBlock(long pointId)
+        {
+            long blockIndex = (pointId >>> blockShift) + 1;
+
+            if (blockIndex >= blockOffsets.length())
+                return false;
+
+            termsInput.seek(blockOffsets.get(blockIndex) + termsDataFp);
+            readTerm(blockIndex << blockShift, nextBlockTerm);
+
+            return true;
+        }
+
+        // Reset currentPointId and currentTerm to be at the start of the block
+        // pointed to by currentBlockIndex.
+        private void resetPosition()

Review Comment:
   I think `resetToCurrentBlock` works best. I've changed it to that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to