aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420641426
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, final boolean includeValue } @Override - public List<SegmentSearchResult> findAll(final long fromTime, final long toTime) { - long currNextTimestamp = nextTimestamp; - final List<SegmentSearchResult> segmentSearchResults = new ArrayList<>(); - long currTimestamp = -1L; // choose an invalid timestamp. if this is valid, this needs to be re-worked - int currValueSize; - int currIndex = 0; - int cumValueSize = 0; - while (currTimestamp != minTimestamp) { - final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); + public SegmentSearchResult find(final long fromTime, final long toTime, final ResultOrder order) { + // this segment does not have any record in query specified time range + if (toTime < minTimestamp || fromTime > nextTimestamp) { + return null; + } + long currTimestamp = -1; + long currNextTimestamp = -1; + + + if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == -1) { + findValuesStartingIndex(); + } + + while (hasStillRecord(currTimestamp, currNextTimestamp, order)) { + final int timestampSegmentIndex = getTimestampIndex(order, currentDeserIndex); currTimestamp = ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex); - currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); - cumValueSize += Math.max(currValueSize, 0); + currNextTimestamp = timestampSegmentIndex == 2 * TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata (timestamp + value size) + : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - (TIMESTAMP_SIZE + VALUE_SIZE)); + final int currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); if (currValueSize >= 0) { final byte[] value = new byte[currValueSize]; - final int valueSegmentIndex = segmentValue.length - cumValueSize; + final int valueSegmentIndex = getValueSegmentIndex(order, currentCumValueSize, currValueSize); System.arraycopy(segmentValue, valueSegmentIndex, value, 0, currValueSize); if (currTimestamp <= toTime && currNextTimestamp > fromTime) { - segmentSearchResults.add(new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value)); + currentCumValueSize += currValueSize; + currentDeserIndex++; + return new SegmentSearchResult(currentDeserIndex - 1, currTimestamp, currNextTimestamp, value); } } - // prep for next iteration - currNextTimestamp = currTimestamp; + currentCumValueSize += Math.max(currValueSize, 0); + currentDeserIndex++; + } + // search in segment expected to find result but did not + return null; + } + + private boolean hasStillRecord(final long currTimestamp, final long currNextTimestamp, final ResultOrder order) { + return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != nextTimestamp : currTimestamp != minTimestamp; + } + + private int getValueSegmentIndex(final ResultOrder order, final int currentCumValueSize, final int currValueSize) { + return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + currentCumValueSize + : segmentValue.length - (currentCumValueSize + currValueSize); + } + + private int getTimestampIndex(final ResultOrder order, final int currIndex) { + return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - ((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE)) + : 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); + } + + private void findValuesStartingIndex() { + long currTimestamp = -1; + int currIndex = 0; + int timestampSegmentIndex = 0; + while (currTimestamp != minTimestamp) { + timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE); + currTimestamp = ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex); Review Comment: > Given that we parse all timestamps, we should buffer them I do agree with you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org