mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1411475602


##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -130,9 +129,9 @@ public Optional<Instant> toTime() {
 
     /**
      * The order of the returned records by timestamp.
-     * @return true if the query returns records in ascending order of 
timestamps
+     * @return UNORDERED, ASCENDING, or DESCENDING if the query returns 
records in an unordered, ascending, or descending order of timestamps.

Review Comment:
   We should not list `UNORDERED, ASCENDING, or DESCENDING` but phrase it 
generically -- if we change the enum I am sure we will not remember to update 
it here introducing incorrect docs



##########
streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java:
##########
@@ -48,7 +49,7 @@ public VersionedRecord(final V value, final long timestamp) {
      * @param timestamp  The timestamp
      * @param validTo    The exclusive upper bound of the validity interval
      */
-    public VersionedRecord(final V value, final long timestamp, final long 
validTo) {
+    public VersionedRecord(final V value, final long timestamp, final 
Optional<Long> validTo) {

Review Comment:
   Should we pass in an `Optional<Long> validTo`? Seems if there is no 
`validTo`, we would use `VersionedRecord(final V value, final long timestamp)` 
instead of passing an `Optiona.empty()`, and thus `validTo` here should just be 
`long`, and we assign `this.validTo = Optinal.of(validTo)`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -23,6 +23,7 @@
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
 
+import java.security.InvalidParameterException;

Review Comment:
   I don't think that's the right exception (cf the package, and comment below).



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    protected ListIterator<VersionedRecord<byte[]>> iterator;

Review Comment:
   Why is this one `protected` but not `private`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    public VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {

Review Comment:
   Given it's one RocksDB, this comment seems to be void.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    public VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long recordTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (recordTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new 
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), 
recordTimestamp));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");
+            }
+            return new 
VersionedRecordIteratorImpl<>(queryResults.listIterator());
+        } else {
+            // take a RocksDB snapshot to return the segments content at the 
query time (in order to guarantee consistency)
+            final Snapshot snapshot = latestValueStore.getSnapshot();
+            // first check the latest value store
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key, snapshot);
+            if (rawLatestValueAndTimestamp != null) {
+                final long recordTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (recordTimestamp <= toTimestamp) {
+                    queryResults.add(new 
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), 
recordTimestamp));
+                }
+            }
+
+            // check segment stores
+            // consider the search lower bound as -INF (LONG.MIN_VALUE) to 
find the record that has been inserted before the {@code fromTimestamp}
+            // but is still valid in query specified time interval.
+            final List<LogicalKeyValueSegment> segments = 
segmentStores.segments(Long.MIN_VALUE, toTimestamp, false);
+            for (final LogicalKeyValueSegment segment : segments) {
+                final byte[] rawSegmentValue = segment.get(key, snapshot);

Review Comment:
   Ah. Thanks for clarifying. My bad.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -136,11 +139,11 @@ public long put(final Bytes key, final byte[] value, 
final long timestamp) {
         observedStreamTime = Math.max(observedStreamTime, timestamp);
 
         final long foundTs = doPut(
-            versionedStoreClient,
-            observedStreamTime,
-            key,
-            value,
-            timestamp
+                versionedStoreClient,

Review Comment:
   nit: avoid unnecessary reformation / intention changes -- similar below.
   
   80% of diff in this file is just adding/removing spaces... very noisy



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -1016,21 +972,21 @@ private <T extends VersionedStoreSegment> long finishPut(
      * Bytes layout for the value portion of rows stored in the latest value 
store. The layout is
      * a fixed-size timestamp concatenated with the actual record value.
      */
-    private static final class LatestValueFormatter {
+    protected static final class LatestValueFormatter {

Review Comment:
   It seem package-private would be sufficient (instead of `protected`)?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -244,30 +246,37 @@ private <R> QueryResult<R> runVersionedKeyQuery(final 
Query<R> query,
             return result;
         }
 
-      @SuppressWarnings("unchecked")
-      private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query, final PositionBound positionBound, final QueryConfig config) {
-          final QueryResult<R> result;
-          final MultiVersionedKeyQuery<K, V> typedKeyQuery = 
(MultiVersionedKeyQuery<K, V>) query;
-
-          final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? 
typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE);
-          final Instant toTime = typedKeyQuery.toTime().isPresent() ? 
typedKeyQuery.toTime().get() : Instant.ofEpochMilli(Long.MAX_VALUE);
-          MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = 
MultiVersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
-          rawKeyQuery = rawKeyQuery.fromTime(fromTime).toTime(toTime);
-          if (!typedKeyQuery.isAscending()) {
-            rawKeyQuery = rawKeyQuery.withDescendingTimestamps();
-          }
-
-          final QueryResult<VersionedRecordIterator<byte[]>> rawResult = 
wrapped().query(rawKeyQuery, positionBound, config);
-          if (rawResult.isSuccess()) {
-            final MeteredMultiVersionedKeyQueryIterator<V> typedResult = new 
MeteredMultiVersionedKeyQueryIterator<V>(rawResult.getResult(), 
StoreQueryUtils.getDeserializeValue(plainValueSerdes));
-            final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> 
typedQueryResult = 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
-            result = (QueryResult<R>) typedQueryResult;
-          } else {
-            // the generic type doesn't matter, since failed queries have no 
result set.
-            result = (QueryResult<R>) rawResult;
-          }
-          return result;
-      }
+        @SuppressWarnings("unchecked")
+        private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query, final PositionBound positionBound, final QueryConfig config) {
+            final QueryResult<R> result;
+            final MultiVersionedKeyQuery<K, V> typedKeyQuery = 
(MultiVersionedKeyQuery<K, V>) query;
+
+            final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? 
typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE);

Review Comment:
   Can we simplify this and the next two lines?
   ```
    final Instant fromTime = 
typedKeyQuery.orElse(Instant.ofEpochMilli(Long.MIN_VALUE));
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -266,75 +269,28 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
-    public VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
-
-        Objects.requireNonNull(key, "key cannot be null");
+    @SuppressWarnings("unchecked")

Review Comment:
   Why do we need this suppression?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    protected ListIterator<VersionedRecord<byte[]>> iterator;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner;
+    private Snapshot snapshot;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+        this.snapshot = null;
+        this.snapshotOwner = null;
+    }
+
+    @Override
+    public void close() {
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext() || maybeFillIterator();
+    }
+
+    @Override
+    public Object next() {
+        if (hasNext()) {
+            return iterator.next();
+        }
+        return null;
+    }
+    private boolean maybeFillIterator() {

Review Comment:
   nit: missing blank line



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {

Review Comment:
   Should we `extend AbstractIterator` to share some common code?
   
   I remember that we also introduced `ManagedIterator` interface for versioned 
state store -- would we need to implement it, too?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -244,30 +246,37 @@ private <R> QueryResult<R> runVersionedKeyQuery(final 
Query<R> query,
             return result;
         }
 
-      @SuppressWarnings("unchecked")
-      private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query, final PositionBound positionBound, final QueryConfig config) {
-          final QueryResult<R> result;
-          final MultiVersionedKeyQuery<K, V> typedKeyQuery = 
(MultiVersionedKeyQuery<K, V>) query;
-
-          final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? 
typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE);
-          final Instant toTime = typedKeyQuery.toTime().isPresent() ? 
typedKeyQuery.toTime().get() : Instant.ofEpochMilli(Long.MAX_VALUE);
-          MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = 
MultiVersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
-          rawKeyQuery = rawKeyQuery.fromTime(fromTime).toTime(toTime);
-          if (!typedKeyQuery.isAscending()) {
-            rawKeyQuery = rawKeyQuery.withDescendingTimestamps();
-          }
-
-          final QueryResult<VersionedRecordIterator<byte[]>> rawResult = 
wrapped().query(rawKeyQuery, positionBound, config);
-          if (rawResult.isSuccess()) {
-            final MeteredMultiVersionedKeyQueryIterator<V> typedResult = new 
MeteredMultiVersionedKeyQueryIterator<V>(rawResult.getResult(), 
StoreQueryUtils.getDeserializeValue(plainValueSerdes));
-            final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> 
typedQueryResult = 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
-            result = (QueryResult<R>) typedQueryResult;
-          } else {
-            // the generic type doesn't matter, since failed queries have no 
result set.
-            result = (QueryResult<R>) rawResult;
-          }
-          return result;
-      }
+        @SuppressWarnings("unchecked")
+        private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query, final PositionBound positionBound, final QueryConfig config) {
+            final QueryResult<R> result;
+            final MultiVersionedKeyQuery<K, V> typedKeyQuery = 
(MultiVersionedKeyQuery<K, V>) query;
+
+            final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? 
typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE);
+            final Instant toTime = typedKeyQuery.toTime().isPresent() ? 
typedKeyQuery.toTime().get() : Instant.ofEpochMilli(Long.MAX_VALUE);
+            if (fromTime.compareTo(toTime) > 0) {
+                throw new InvalidParameterException("The `fromTime` timestamp 
must be smaller than the `toTime` timestamp.");

Review Comment:
   Should this be `IllegelArgumentException`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -266,75 +269,28 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
-    public VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
-
-        Objects.requireNonNull(key, "key cannot be null");
+    @SuppressWarnings("unchecked")
+    protected VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final ResultOrder order) {

Review Comment:
   Can we make it `private` (or package private)?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    protected ListIterator<VersionedRecord<byte[]>> iterator;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner;
+    private Snapshot snapshot;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+        this.snapshot = null;
+        this.snapshotOwner = null;
+    }
+
+    @Override
+    public void close() {
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext() || maybeFillIterator();

Review Comment:
   Should we add a member `close` and throw if iterator is already closed? (cf 
`RocksDbIterator` in the code-base)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    protected ListIterator<VersionedRecord<byte[]>> iterator;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner;
+    private Snapshot snapshot;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+        this.snapshot = null;
+        this.snapshotOwner = null;
+    }
+
+    @Override
+    public void close() {
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext() || maybeFillIterator();
+    }
+
+    @Override
+    public Object next() {
+        if (hasNext()) {
+            return iterator.next();
+        }
+        return null;

Review Comment:
   I think we should throw an exception for this case (or maybe we can just 
inherit from `AbstractIterator` instead.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    protected ListIterator<VersionedRecord<byte[]>> iterator;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner;
+    private Snapshot snapshot;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+        this.snapshot = null;
+        this.snapshotOwner = null;
+    }
+
+    @Override
+    public void close() {
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext() || maybeFillIterator();
+    }
+
+    @Override
+    public Object next() {
+        if (hasNext()) {
+            return iterator.next();
+        }
+        return null;
+    }
+    private boolean maybeFillIterator() {
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+        while (segmentIterator.hasNext()) {
+            final LogicalKeyValueSegment segment = segmentIterator.next();
+
+            if (snapshot == null) { // create the snapshot (this will happen 
only one time).
+                this.snapshotOwner = segment;
+                // take a RocksDB snapshot to return the segments content at 
the query time (in order to guarantee consistency)
+                final Lock lock = new ReentrantLock();

Review Comment:
   My original comment about locks was because I thought there is multiple 
RocksDBs -- but you explained to me, that all segments (latest and historical) 
are logical abstraction over the same physical RocksDB, so maybe we don't need 
locks?
   
   This might also simplify the code, because we can get a single snapshot when 
the iterator is creates, and we don't need to track anything else, but use the 
same snaphot throughout the lifetime of the iterator? (Also don't need 
`snapshotOwner` any longer, as we don't have about the "logical segment" in 
use, given all of them have the same physical RocksDB instance under the hood?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+    protected final ListIterator<LogicalKeyValueSegment> segmentIterator;
+    private final Bytes key;
+    private final Long fromTime;
+    private final Long toTime;
+    private final ResultOrder order;
+    protected ListIterator<VersionedRecord<byte[]>> iterator;
+
+    // defined for creating/releasing the snapshot. 
+    private LogicalKeyValueSegment snapshotOwner;
+    private Snapshot snapshot;
+
+
+
+    public LogicalSegmentIterator(final ListIterator<LogicalKeyValueSegment> 
segmentIterator,
+                                  final Bytes key,
+                                  final Long fromTime,
+                                  final Long toTime,
+                                  final ResultOrder order) {
+
+        this.segmentIterator = segmentIterator;
+        this.key = key;
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.iterator = Collections.emptyListIterator();
+        this.order = order;
+        this.snapshot = null;
+        this.snapshotOwner = null;
+    }
+
+    @Override
+    public void close() {
+        // user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+        releaseSnapshot();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext() || maybeFillIterator();
+    }
+
+    @Override
+    public Object next() {
+        if (hasNext()) {
+            return iterator.next();
+        }
+        return null;
+    }
+    private boolean maybeFillIterator() {
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+        while (segmentIterator.hasNext()) {
+            final LogicalKeyValueSegment segment = segmentIterator.next();
+
+            if (snapshot == null) { // create the snapshot (this will happen 
only one time).
+                this.snapshotOwner = segment;
+                // take a RocksDB snapshot to return the segments content at 
the query time (in order to guarantee consistency)
+                final Lock lock = new ReentrantLock();
+                lock.lock();
+                try {
+                    this.snapshot = snapshotOwner.getSnapshot();
+                } finally {
+                    lock.unlock();
+                }
+            }
+
+            final byte[] rawSegmentValue = segment.get(key, snapshot);
+            if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
+                if (segment.id() == -1) { // this is the latestValueStore
+                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
+                    if (recordTimestamp <= toTime) {
+                        // latest value satisfies timestamp bound
+                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
+                    }
+                } else {
+                    // this segment contains records with the specified key 
and time range
+                    final 
List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult>
 searchResults =
+                            
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
+                    for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
+                        queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
Optional.of(searchResult.validTo())));
+                    }
+                }
+            }
+            if (!queryResults.isEmpty()) {
+                break;
+            }
+        }
+        if (!queryResults.isEmpty()) {
+            if (order.equals(ResultOrder.ASCENDING)) {
+                queryResults.sort((r1, r2) -> (int) (r1.timestamp() - 
r2.timestamp()));

Review Comment:
   Wondering if we need to sort? Could we also travers `queryResult` backwards 
later instead?
   
   It seems we could init with `queryResults.listIterator(queryResult.size())` 
and use `iterator.previous()` instead of `iterator.next()` inside `next()` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    public VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long recordTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (recordTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new 
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), 
recordTimestamp));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");
+            }
+            return new 
VersionedRecordIteratorImpl<>(queryResults.listIterator());
+        } else {
+            // take a RocksDB snapshot to return the segments content at the 
query time (in order to guarantee consistency)
+            final Snapshot snapshot = latestValueStore.getSnapshot();

Review Comment:
   Seems this commend it void given it's a single RocksDB. My bad.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -266,75 +269,28 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
-    public VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
-
-        Objects.requireNonNull(key, "key cannot be null");
+    @SuppressWarnings("unchecked")
+    protected VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final ResultOrder order) {
         validateStoreOpen();
 
-        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
-
         if (toTimestamp < observedStreamTime - historyRetention) {
             // history retention exceeded. we still check the latest value 
store in case the
             // latest record version satisfies the timestamp bound, in which 
case it should
             // still be returned (i.e., the latest record version per key 
never expires).
-            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
-            if (rawLatestValueAndTimestamp != null) {
-                final long recordTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
-                if (recordTimestamp <= toTimestamp) {
-                    // latest value satisfies timestamp bound
-                    queryResults.add(new 
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), 
recordTimestamp));
-                }
-            }
-
-            // history retention has elapsed and the latest record version (if 
present) does
-            // not satisfy the timestamp bound. return null for 
predictability, even if data
-            // is still present in segments.
-            if (queryResults.size() == 0) {
-                LOG.warn("Returning null for expired get.");
-            }
-            return new 
VersionedRecordIteratorImpl<>(queryResults.listIterator());
+            return new 
LogicalSegmentIterator(Collections.singletonList(latestValueStore).listIterator(),
 key, fromTimestamp, toTimestamp, order);

Review Comment:
   I love that you added `LogicalSegmentIterator` -- very clean and elegant!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to