aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1424055579


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -100,34 +130,48 @@ private boolean maybeFillIterator() {
             final byte[] rawSegmentValue = segment.get(key, snapshot);
             if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
                 if (segment.id() == -1) { // this is the latestValueStore
-                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-                    if (recordTimestamp <= toTime) {
-                        // latest value satisfies timestamp bound
-                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-                    }
+                    this.currentRawSegmentValue = rawSegmentValue;
                 } else {
-                    // this segment contains records with the specified key 
and time range
-                    final 
List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult>
 searchResults =
-                            
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-                    for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-                        queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-                    }
+                    this.currentDeserializedSegmentValue = new 
ReadonlyPartiallyDeserializedSegmentValue(rawSegmentValue);
                 }
-            }
-            if (!queryResults.isEmpty()) {
-                break;
+                return true;
             }
         }
-        if (!queryResults.isEmpty()) {
-            // since data is stored in descending order in the segments, 
create the list in reverse order, if the order is Ascending.
-            this.iterator = order.equals(ResultOrder.ASCENDING) ? 
queryResults.listIterator(queryResults.size()) : queryResults.listIterator();
-            return true;
-        }
         // if all segments have been processed, release the snapshot
         releaseSnapshot();
         return false;
     }
 
+    private VersionedRecord<byte[]> getNextRecord() {
+        VersionedRecord<byte[]> nextRecord = null;
+        if (currentRawSegmentValue != null) { // this is the latestValueStore
+            final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue);
+            if (recordTimestamp <= toTime) {
+                final byte[] value = 
RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue);
+                // latest value satisfies timestamp bound
+                nextRecord = new VersionedRecord<>(value, recordTimestamp);
+            }
+        } else {
+            final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
currentRecord =
+                    currentDeserializedSegmentValue.find(fromTime, toTime, 
order, nextIndex);
+            if (currentRecord != null) {
+                nextRecord = new VersionedRecord<>(currentRecord.value(), 
currentRecord.validFrom(), currentRecord.validTo());
+                this.nextIndex = order.equals(ResultOrder.ASCENDING) ? 
currentRecord.index() - 1 : currentRecord.index() + 1;
+            }
+        }
+        // no relevant record can be found in the segment
+        if (currentRawSegmentValue != null || nextRecord == null || 
!canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), 
nextRecord.validTo().get())) {

Review Comment:
   > To make sure I understand this correctly:
   > 
   > * If `nextRecord == null` it means we did not find anything and want to go 
to the next segment (make sense)
   > * If `currentRawSegmentValue != null` it does not matter if we filled 
`nextRecord` or not, because the "latest" segment contains at max one record, 
and thus we need to step into the next segment in any case?
   > * Similar for `canSegmentHaveMoreRelevantRecords` we check if we exceeded 
the current segment?
   
   Yes. Exactly.
   When `canSegmentHaveMoreRelevantRecords` returns false, it means that this 
segment has no more records that fit in query time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to