mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1396723148


##########
streams/src/main/java/org/apache/kafka/streams/state/VersionedRecordIterator.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+
+/**
+ * Iterator interface of {@link V}.

Review Comment:
   Does `@link` work here? I thought it would be use to generate a HTML link to 
a target class or method, so don't think we can link to `V`? Should it be 
`Iterator interface for {@link VersionedRecord VersionedRecord<V>}.` or 
something like this?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -170,7 +173,30 @@ public boolean isOpen() {
 
     @Override
     public synchronized byte[] get(final Bytes key) {
-        return physicalStore.get(prefixKeyFormatter.addPrefix(key));
+        return get(key, Optional.empty());
+    }
+
+    public synchronized byte[] get(final Bytes key, final Snapshot snapshot) {
+        return get(key, Optional.of(snapshot));
+    }
+
+    private synchronized byte[] get(final Bytes key, final Optional<Snapshot> 
snapshot) {
+        if (snapshot.isPresent()) {
+            try (ReadOptions readOptions = new ReadOptions()) {
+                readOptions.setSnapshot(snapshot.get());
+                return physicalStore.get(prefixKeyFormatter.addPrefix(key), 
readOptions);
+            }
+        } else {
+            return physicalStore.get(prefixKeyFormatter.addPrefix(key));
+        }
+    }
+
+    public Snapshot getSnapshot() {
+        return physicalStore.db.getSnapshot();

Review Comment:
   I think we should push this into `RocksDBStore` class, ie, call 
`physicalStore.getSnapshot()` here, and extend `RocksDBStore` to track all open 
snapshots (similar to how we track open iterators and release all open snapshot 
if the store is closed).



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    public VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {

Review Comment:
   Given that we want to achieve snapshot semantics, do we need to get some 
lock here that guards both `latestValueStore` and `segmentStores` and allows us 
to get a snapshot for each right away?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -170,7 +173,30 @@ public boolean isOpen() {
 
     @Override
     public synchronized byte[] get(final Bytes key) {
-        return physicalStore.get(prefixKeyFormatter.addPrefix(key));
+        return get(key, Optional.empty());
+    }
+
+    public synchronized byte[] get(final Bytes key, final Snapshot snapshot) {
+        return get(key, Optional.of(snapshot));
+    }
+
+    private synchronized byte[] get(final Bytes key, final Optional<Snapshot> 
snapshot) {
+        if (snapshot.isPresent()) {
+            try (ReadOptions readOptions = new ReadOptions()) {
+                readOptions.setSnapshot(snapshot.get());
+                return physicalStore.get(prefixKeyFormatter.addPrefix(key), 
readOptions);
+            }
+        } else {
+            return physicalStore.get(prefixKeyFormatter.addPrefix(key));
+        }
+    }
+
+    public Snapshot getSnapshot() {
+        return physicalStore.db.getSnapshot();
+    }
+
+    public void releaseSnapshot(final Snapshot snapshot) {
+        physicalStore.db.releaseSnapshot(snapshot);

Review Comment:
   Same. We should add `RocksDBStore#relaseSnapshot` and call it here.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");

Review Comment:
   Seems this was not addressed yet?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    public VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long recordTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (recordTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new 
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), 
recordTimestamp));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");
+            }
+            return new 
VersionedRecordIteratorImpl<>(queryResults.listIterator());
+        } else {
+            // take a RocksDB snapshot to return the segments content at the 
query time (in order to guarantee consistency)
+            final Snapshot snapshot = latestValueStore.getSnapshot();
+            // first check the latest value store
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key, snapshot);
+            if (rawLatestValueAndTimestamp != null) {
+                final long recordTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (recordTimestamp <= toTimestamp) {
+                    queryResults.add(new 
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), 
recordTimestamp));
+                }
+            }
+
+            // check segment stores
+            // consider the search lower bound as -INF (LONG.MIN_VALUE) to 
find the record that has been inserted before the {@code fromTimestamp}
+            // but is still valid in query specified time interval.
+            final List<LogicalKeyValueSegment> segments = 
segmentStores.segments(Long.MIN_VALUE, toTimestamp, false);
+            for (final LogicalKeyValueSegment segment : segments) {
+                final byte[] rawSegmentValue = segment.get(key, snapshot);

Review Comment:
   I don't think we can pass the `snapshot` we got from latestValueStore into 
segmentStore -- it's two independent RocksDBs.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    public VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long recordTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (recordTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new 
VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), 
recordTimestamp));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");
+            }
+            return new 
VersionedRecordIteratorImpl<>(queryResults.listIterator());
+        } else {
+            // take a RocksDB snapshot to return the segments content at the 
query time (in order to guarantee consistency)
+            final Snapshot snapshot = latestValueStore.getSnapshot();

Review Comment:
   I think we need to get both snapshots for latestValeuStore and segmentStore 
at the same time ("atomically" protected by a lock) to avoid a race between 
both store.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##########
@@ -218,9 +224,9 @@ private static <R> QueryResult<R> runRangeQuery(
 
     @SuppressWarnings("unchecked")
     private static <R> QueryResult<R> runKeyQuery(final Query<R> query,
-                                                  final PositionBound 
positionBound,
-                                                  final QueryConfig config,
-                                                  final StateStore store) {
+        final PositionBound positionBound,
+        final QueryConfig config,
+        final StateStore store) {

Review Comment:
   revert indention change (similar below)
   
   Do you have some IntelliJ auto-formatting enabled? I would recommend to 
disable it to avoid stuff like this.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -263,6 +266,78 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    public VersionedRecordIterator<byte[]> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);

Review Comment:
   While this is an optimization for which we only query "latestValueStore", I 
think we still need to guard against concurrent writes?
   
   We might also need to so something about `observedStreamTime` (at least 
making it `volatile` -- not sure if sufficient right now).



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##########
@@ -374,9 +380,31 @@ private static <R> QueryResult<R> 
runVersionedKeyQuery(final Query<R> query,
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query,
+        final PositionBound positionBound,

Review Comment:
   nit: fix indention



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();

Review Comment:
   Seems are still materializing the result in-memory? Given the new snapshot 
implementation, I think we can avoid this, by pushing most of the logic to do 
actual `get()` into latest/segment stores into the iterator itself, to execute 
gets lazily, too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to