showuon commented on code in PR #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r852555527


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java:
##########
@@ -234,7 +275,11 @@ public void put(final Bytes key,
 
     @Override
     public byte[] get(final Bytes key) {
-        final S segment = 
segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
+        final long timestampFromKey = keySchema.segmentTimestamp(key);
+        // check if timestamp is expired
+        if (timestampFromKey < observedStreamTime - retentionPeriod + 1)
+            return null;

Review Comment:
   We might be able to print a debug log here to mention the value is expired 
so won't return. Same as others.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java:
##########
@@ -157,43 +157,34 @@ public void shouldPutAndFetch() {
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
-            );
+            // all records expired as actual from is 59001 and to is 1000

Review Comment:
   Could you make it clear that where the 59001 come from? (I think it's 
observed time 60000 - retention 1000 + 1, but please make it clear in comment). 
Although we can understand the emptyIterator is because records are expired, we 
are unclear where the 59001 come from. Thanks.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java:
##########
@@ -157,43 +157,34 @@ public void shouldPutAndFetch() {
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
-            );
+            // all records expired as actual from is 59001 and to is 1000
+            final List<KeyValue<Windowed<String>, Long>> expected = 
Collections.emptyList();
 
             assertEquals(expected, toList(values));
         }
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, 
windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
-            );
+            // all records expired as actual from is 59001 and to is 1000
+            final List<KeyValue<Windowed<String>, Long>> expected = 
Collections.emptyList();

Review Comment:
   Thanks for making the tests work as expected. But I'm afraid it'll change 
the original test goal, and might not catch the error when things go wrong. 
Could we have 2 tests, one with large retention period, so that we can keep the 
original tests, and the other one is what you modified here? 
   
   Same comments to other tests. Thanks.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java:
##########
@@ -87,23 +90,35 @@ public KeyValueIterator<Bytes, byte[]> backwardFetch(final 
Bytes key,
         return fetch(key, from, to, false);
     }
 
+
     KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
                                           final long from,
                                           final long to,
                                           final boolean forward) {
-        final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, 
to, forward);
 
-        final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
+        final long actualFrom = getActualFrom(from);
+
+        if (keySchema instanceof WindowKeySchema && to < actualFrom) {
+            return KeyValueIterators.emptyIterator();

Review Comment:
   Could you help me understand why we only return `emptyIterator` when it's 
`WindowKeySchema`? Why not other types of schema?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java:
##########
@@ -181,15 +189,16 @@ public void testRolling() {
             ),
             segmentDirs(baseDir)
         );
-
+        // expired record
         assertEquals(
-            new HashSet<>(Collections.singletonList("zero")),
+            new HashSet<>(Collections.emptyList()),

Review Comment:
   Could we make the retention period longer to keep the original tests? I'm 
afraid this change will not catch the expected error when code changes 
unexpectedly. I'm thinking we can have 2 tests, one is the long retention 
period, and the other one is current setting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to