ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r480481060



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -338,25 +452,36 @@ public synchronized void close() {
 
         private CacheIteratorWrapper(final Bytes key,
                                      final long timeFrom,
-                                     final long timeTo) {
-            this(key, key, timeFrom, timeTo);
+                                     final long timeTo,
+                                     final boolean forward) {
+            this(key, key, timeFrom, timeTo, forward);
         }
 
         private CacheIteratorWrapper(final Bytes keyFrom,
                                      final Bytes keyTo,
                                      final long timeFrom,
-                                     final long timeTo) {
+                                     final long timeTo,
+                                     final boolean forward) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             this.timeTo = timeTo;
-            this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+            this.forward = forward;
 
             this.segmentInterval = cacheFunction.getSegmentInterval();
-            this.currentSegmentId = cacheFunction.segmentId(timeFrom);
 
-            setCacheKeyRange(timeFrom, currentSegmentLastTime());
+            if (forward) {
+                this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+                this.currentSegmentId = cacheFunction.segmentId(timeFrom);
 
-            this.current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+                setCacheKeyRange(timeFrom, currentSegmentLastTime());
+                this.current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+            } else {
+                this.currentSegmentId = 
cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get()));
+                this.lastSegmentId = cacheFunction.segmentId(timeFrom);
+
+                setCacheKeyRange(currentSegmentBeginTime(), Math.min(timeTo, 
maxObservedTimestamp.get()));

Review comment:
       This looks right to me -- in the iterator constructor, we would normally 
start from `timeFrom` (the minimum time) and advance to the end of the current 
segment (that's what the "cache key range" defines, the range of the current 
segment) When iterating backwards, the current segment is actually the largest 
segment, so the cache key lower range is the current (largest) segment's 
beginning timestamp, and the upper range is the maximum timestamp of the 
backwards fetch. Does that make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to