ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r475014135



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -72,22 +86,40 @@
             searchSpace.iterator(),

Review comment:
       This should be a descending iterator for the reverse case (here and the 
other reverse methods in this class)

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
##########
@@ -107,26 +109,76 @@ public void shouldIterateOverAllSegments() {
         assertFalse(iterator.hasNext());
     }
 
+    @Test
+    public void shouldIterateBackwardOverAllSegments() {
+        iterator = new SegmentIterator<>(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            Bytes.wrap("a".getBytes()),
+            Bytes.wrap("z".getBytes()),
+            false);
+
+        assertTrue(iterator.hasNext());
+        assertEquals("b", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("b", "2"), 
toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("a", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("a", "1"), 
toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("d", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("d", "4"), 
toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("c", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("c", "3"), 
toStringKeyValue(iterator.next()));

Review comment:
       This doesn't look quite right...shouldn't it be D, C, B, A? I guess in 
the test we just need to use a descending iterator 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -426,7 +558,12 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());

Review comment:
       Github won't let me comment up there but on line 418, shouldn't we 
should have to decrement the currentSegmentId for the reverse case? I'm a 
little confused because it looks like you have test coverage for the 
multi-segment case and it seems to pass. Maybe I'm just tired and missing 
something obvious here..
   For example in 
`CachingWindowStoreTest#shouldFetchAndIterateOverKeyBackwardRange` the results 
seem to go across multiple segments, but it looks like we actually do return 
the record from the largest segment first? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final 
long windowStartTimes
     @Deprecated
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key, final long 
timeFrom, final long timeTo) {
+        return fetch(key, timeFrom, timeTo, true);
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final 
Instant from, final Instant to) {
+        final long timeFrom = ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+        final long timeTo = ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));

Review comment:
       I feel like it's a little awkward to have the reverse variations accept 
time parameters as an `Instant` while the forward versions just use a `long`. I 
would have thought we could migrate the long methods to Instant at some point 
but I see all these ` note, this method must be kept if super#fetch(...) is 
removed` comments littered throughout the code...so maybe there's a reason for 
sticking with the `long` overrides in the innermost store layer?
   Did you come across anything that suggested a reason for keeping the long 
flavors? cc @guozhangwang or @mjsax -- why can't we remove these?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##########
@@ -119,15 +118,16 @@
      * <p>
      * This iterator must be closed after use.
      *
-     * @param from      the first key in the range
-     * @param to        the last key in the range
-     * @param timeFrom  time range start (inclusive)
-     * @param timeTo    time range end (inclusive)
+     * @param from     the first key in the range
+     * @param to       the last key in the range
+     * @param timeFrom time range start (inclusive)
+     * @param timeTo   time range end (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, 
value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException if one of the given keys is {@code null}
+     * @throws NullPointerException       if one of the given keys is {@code 
null}
      */
-    @SuppressWarnings("deprecation") // note, this method must be kept if 
super#fetch(...) is removed
+    // note, this method must be kept if super#fetch(...) is removed
+    @SuppressWarnings("deprecation")
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long 
timeTo);

Review comment:
       Do you know why we have all these ReadOnlyWindowStore methods also 
declared here in WindowStore? We don't need reverse variations of these I 
guess? 🤔  

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
##########
@@ -212,17 +316,62 @@ public boolean hasNext() {
         };
     }
 
-    @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from,
-                                                            final K to,
-                                                            final Instant 
fromTime,
-                                                            final Instant 
toTime) throws IllegalArgumentException {
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                  final K to,
+                                                  final Instant fromTime,
+                                                  final Instant toTime) throws 
IllegalArgumentException {
         return fetch(
             from,
-            to, 
+            to,
             ApiUtils.validateMillisecondInstant(fromTime, 
prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")),
             ApiUtils.validateMillisecondInstant(toTime, 
prepareMillisCheckFailMsgPrefix(toTime, "toTime")));
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(final K from,
+                                                          final K to,
+                                                          final Instant 
fromTimeInstant,
+                                                          final Instant 
toTimeInstant) throws IllegalArgumentException {
+        final long timeFrom = 
ApiUtils.validateMillisecondInstant(fromTimeInstant, 
prepareMillisCheckFailMsgPrefix(fromTimeInstant, "fromTimeInstant"));
+        final long timeTo = ApiUtils.validateMillisecondInstant(toTimeInstant, 
prepareMillisCheckFailMsgPrefix(toTimeInstant, "toTimeInstant"));
+        if (!open) {
+            throw new InvalidStateStoreException("Store is not open");
+        }
+        final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
+        for (long now = timeFrom; now <= timeTo; now++) {

Review comment:
       Need to flip the loop

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -426,7 +558,12 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
 
             current.close();
-            current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+
+            if (forward) {
+                current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+            } else {
+                current = context.cache().reverseRange(cacheName, 
cacheKeyFrom, cacheKeyTo);
+            }
         }
 
         private void setCacheKeyRange(final long lowerRangeEndTime, final long 
upperRangeEndTime) {

Review comment:
       Just noticed that we use `==` instead of `.equals` down on line 437, can 
you fix that on the side?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
##########
@@ -135,24 +188,74 @@ public void 
shouldThrowInvalidStateStoreExceptionOnRebalance() {
         store.fetch("key", ofEpochMilli(1), ofEpochMilli(10));
     }
 
+    @Test(expected = InvalidStateStoreException.class)

Review comment:
       nit: use `assertThrows` 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -419,13 +504,13 @@ Long minTime() {
         }

Review comment:
       As always Github won't let me comment on the line I actually want to (😞 
) but I think we need a descending iterator for the reverse case in 
`setRecordIterator` (lines 411 & 413)

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
##########
@@ -123,7 +173,8 @@ public void shouldIterateBothStoreAndCache() {
 
     private MergedSortedCacheWindowStoreKeyValueIterator createIterator(
         final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs,
-        final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs
+        final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs,
+        final boolean forward

Review comment:
       nit: the existing style here is  inconsistent with the rest of Streams, 
should have 1st parameter on same line as method declaration (and everything 
else aligned to that) 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -337,25 +462,32 @@ public synchronized void close() {
 
         private CacheIteratorWrapper(final Bytes key,
                                      final long timeFrom,
-                                     final long timeTo) {
-            this(key, key, timeFrom, timeTo);
+                                     final long timeTo,
+                                     final boolean forward) {
+            this(key, key, timeFrom, timeTo, forward);
         }
 
         private CacheIteratorWrapper(final Bytes keyFrom,
                                      final Bytes keyTo,
                                      final long timeFrom,
-                                     final long timeTo) {
+                                     final long timeTo,
+                                     final boolean forward) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             this.timeTo = timeTo;
             this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+            this.forward = forward;
 
             this.segmentInterval = cacheFunction.getSegmentInterval();
             this.currentSegmentId = cacheFunction.segmentId(timeFrom);

Review comment:
       We should start on the largest segment I think (largest segment == 
farthest advanced in time)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to