aliehsaeedii commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1404610275
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + public VersionedRecordIterator<byte[]> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long recordTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (recordTimestamp <= toTimestamp) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), recordTimestamp)); + } + } + + // history retention has elapsed and the latest record version (if present) does + // not satisfy the timestamp bound. return null for predictability, even if data + // is still present in segments. + if (queryResults.size() == 0) { + LOG.warn("Returning null for expired get."); + } + return new VersionedRecordIteratorImpl<>(queryResults.listIterator()); + } else { + // take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) + final Snapshot snapshot = latestValueStore.getSnapshot(); + // first check the latest value store + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key, snapshot); + if (rawLatestValueAndTimestamp != null) { + final long recordTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (recordTimestamp <= toTimestamp) { + queryResults.add(new VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), recordTimestamp)); + } + } + + // check segment stores + // consider the search lower bound as -INF (LONG.MIN_VALUE) to find the record that has been inserted before the {@code fromTimestamp} + // but is still valid in query specified time interval. + final List<LogicalKeyValueSegment> segments = segmentStores.segments(Long.MIN_VALUE, toTimestamp, false); + for (final LogicalKeyValueSegment segment : segments) { + final byte[] rawSegmentValue = segment.get(key, snapshot); Review Comment: > I don't think we can pass the `snapshot` we got from latestValueStore into segmentStore -- it's two independent RocksDBs. Seems like they are the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org