aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424055579
########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -100,34 +130,48 @@ private boolean maybeFillIterator() { final byte[] rawSegmentValue = segment.get(key, snapshot); if (rawSegmentValue != null) { // this segment contains record(s) with the specified key if (segment.id() == -1) { // this is the latestValueStore - final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); - if (recordTimestamp <= toTime) { - // latest value satisfies timestamp bound - queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); - } + this.currentRawSegmentValue = rawSegmentValue; } else { - // this segment contains records with the specified key and time range - final List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult> searchResults = - RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime, toTime); - for (final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult searchResult : searchResults) { - queryResults.add(new VersionedRecord<>(searchResult.value(), searchResult.validFrom(), searchResult.validTo())); - } + this.currentDeserializedSegmentValue = new ReadonlyPartiallyDeserializedSegmentValue(rawSegmentValue); } - } - if (!queryResults.isEmpty()) { - break; + return true; } } - if (!queryResults.isEmpty()) { - // since data is stored in descending order in the segments, create the list in reverse order, if the order is Ascending. - this.iterator = order.equals(ResultOrder.ASCENDING) ? queryResults.listIterator(queryResults.size()) : queryResults.listIterator(); - return true; - } // if all segments have been processed, release the snapshot releaseSnapshot(); return false; } + private VersionedRecord<byte[]> getNextRecord() { + VersionedRecord<byte[]> nextRecord = null; + if (currentRawSegmentValue != null) { // this is the latestValueStore + final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue); + if (recordTimestamp <= toTime) { + final byte[] value = RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue); + // latest value satisfies timestamp bound + nextRecord = new VersionedRecord<>(value, recordTimestamp); + } + } else { + final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult currentRecord = + currentDeserializedSegmentValue.find(fromTime, toTime, order, nextIndex); + if (currentRecord != null) { + nextRecord = new VersionedRecord<>(currentRecord.value(), currentRecord.validFrom(), currentRecord.validTo()); + this.nextIndex = order.equals(ResultOrder.ASCENDING) ? currentRecord.index() - 1 : currentRecord.index() + 1; + } + } + // no relevant record can be found in the segment + if (currentRawSegmentValue != null || nextRecord == null || !canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), nextRecord.validTo().get())) { Review Comment: > To make sure I understand this correctly: > > * If `nextRecord == null` it means we did not find anything and want to go to the next segment (make sense) > * If `currentRawSegmentValue != null` it does not matter if we filled `nextRecord` or not, because the "latest" segment contains at max one record, and thus we need to step into the next segment in any case? > * Similar for `canSegmentHaveMoreRelevantRecords` we check if we exceeded the current segment? Yes. Exactly. When `canSegmentHaveMoreRelevantRecords` returns false, it means that this segment has no more records that fit in query time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org