guozhangwang commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r480469688



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
##########
@@ -136,34 +174,64 @@
      * <p>
      * This iterator must be closed after use.
      *
-     * @param from      the first key in the range
-     * @param to        the last key in the range
-     * @param fromTime  time range start (inclusive)
-     * @param toTime    time range end (inclusive)
-     * @return an iterator over windowed key-value pairs {@code <Windowed<K>, 
value>}
+     * @param from     the first key in the range
+     * @param to       the last key in the range
+     * @param timeFrom time range start (inclusive), where iteration starts.
+     * @param timeTo   time range end (inclusive), where iteration ends.
+     * @return an iterator over windowed key-value pairs {@code <Windowed<K>, 
value>}, from beginning to end of time.
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException If {@code null} is used for any key.
-     * @throws IllegalArgumentException if duration is negative or can't be 
represented as {@code long milliseconds}
+     * @throws NullPointerException       If {@code null} is used for any key.
+     * @throws IllegalArgumentException   if duration is negative or can't be 
represented as {@code long milliseconds}
      */
-    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, 
Instant toTime)
+    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant timeFrom, 
Instant timeTo)
         throws IllegalArgumentException;
 
     /**
-    * Gets all the key-value pairs in the existing windows.
-    *
-    * @return an iterator over windowed key-value pairs {@code <Windowed<K>, 
value>}
-    * @throws InvalidStateStoreException if the store is not initialized
-    */
+     * Get all the key-value pairs in the given key range and time range from 
all the existing windows
+     * in backward order with respect to time (from end to beginning of time).
+     * <p>
+     * This iterator must be closed after use.
+     *
+     * @param from     the first key in the range
+     * @param to       the last key in the range
+     * @param timeFrom time range start (inclusive), where iteration ends.
+     * @param timeTo   time range end (inclusive), where iteration starts.
+     * @return an iterator over windowed key-value pairs {@code <Windowed<K>, 
value>}, from end to beginning of time.
+     * @throws InvalidStateStoreException if the store is not initialized
+     * @throws NullPointerException       If {@code null} is used for any key.
+     * @throws IllegalArgumentException   if duration is negative or can't be 
represented as {@code long milliseconds}
+     */
+    KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to, Instant 
timeFrom, Instant timeTo)

Review comment:
       This is out of the scope of this PR, but I'd like to point out that the 
current IQ does not actually obey the ordering when there are multiple local 
stores hosted on that instance. For example, if there are two stores from two 
tasks hosting keys {1, 3} and {2,4}, then a range query of key [1,4] would 
return in the order of `1,3,2,4` but not `1,2,3,4` since it is looping over the 
stores only. This would be the case for either forward or backward fetches on 
range-key-range-time.
   
   For single key time range fetch, or course, there's no such issue.
   
   I think it worth documenting this for now until we have a fix (and actually 
we are going to propose something soon).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
##########
@@ -33,11 +33,11 @@
     /**
      * Get the value of key from a window.
      *
-     * @param key       the key to fetch
-     * @param time      start timestamp (inclusive) of the window
+     * @param key  the key to fetch
+     * @param time start timestamp (inclusive) of the window
      * @return The value or {@code null} if no value is found in the window
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException If {@code null} is used for any key.
+     * @throws NullPointerException       If {@code null} is used for any key.

Review comment:
       nit: is this intentional? Also I'd suggest we do not use capitalized 
`If` to be consistent with the above line, ditto elsewhere below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##########
@@ -150,13 +185,25 @@
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, 
value>}
      * @throws InvalidStateStoreException if the store is not initialized
      */
-    @SuppressWarnings("deprecation") // note, this method must be kept if 
super#fetchAll(...) is removed
+    // note, this method must be kept if super#fetchAll(...) is removed
+    @SuppressWarnings("deprecation")
     KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
 
     @Override
-    default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, 
final Instant to) {
+    default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant timeFrom, 
final Instant timeTo) {
         return fetchAll(
-            ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from")),
-            ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to")));
+            ApiUtils.validateMillisecondInstant(timeFrom, 
prepareMillisCheckFailMsgPrefix(timeFrom, "timeFrom")),
+            ApiUtils.validateMillisecondInstant(timeTo, 
prepareMillisCheckFailMsgPrefix(timeTo, "timeTo")));
+    }
+
+    default KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long 
timeFrom, final long timeTo) {

Review comment:
       Since we are going to remove deprecated overloads with primitive long in 
the future, I think we do not need to expose a default function here, but just 
provide a default impl of the function in 204 below as UnsupportedOperation?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -338,25 +452,36 @@ public synchronized void close() {
 
         private CacheIteratorWrapper(final Bytes key,
                                      final long timeFrom,
-                                     final long timeTo) {
-            this(key, key, timeFrom, timeTo);
+                                     final long timeTo,
+                                     final boolean forward) {
+            this(key, key, timeFrom, timeTo, forward);
         }
 
         private CacheIteratorWrapper(final Bytes keyFrom,
                                      final Bytes keyTo,
                                      final long timeFrom,
-                                     final long timeTo) {
+                                     final long timeTo,
+                                     final boolean forward) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             this.timeTo = timeTo;
-            this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+            this.forward = forward;
 
             this.segmentInterval = cacheFunction.getSegmentInterval();
-            this.currentSegmentId = cacheFunction.segmentId(timeFrom);
 
-            setCacheKeyRange(timeFrom, currentSegmentLastTime());
+            if (forward) {
+                this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+                this.currentSegmentId = cacheFunction.segmentId(timeFrom);
 
-            this.current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+                setCacheKeyRange(timeFrom, currentSegmentLastTime());
+                this.current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+            } else {
+                this.currentSegmentId = 
cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get()));
+                this.lastSegmentId = cacheFunction.segmentId(timeFrom);
+
+                setCacheKeyRange(currentSegmentBeginTime(), Math.min(timeTo, 
maxObservedTimestamp.get()));

Review comment:
       Why this call is different based on the `forward` boolean? It's not 
clear to me. cc @ableegoldman @lct45 could you double check?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
##########
@@ -67,14 +70,22 @@ public Bytes peekNextKey() {
     public boolean hasNext() {
         boolean hasNext = false;
         while ((currentIterator == null || !(hasNext = 
hasNextConditionHasNext()) || !currentSegment.isOpen())
-                && segments.hasNext()) {
+            && segments.hasNext()) {
             close();
             currentSegment = segments.next();
             try {
                 if (from == null || to == null) {
-                    currentIterator = currentSegment.all();
+                    if (forward) {
+                        currentIterator = currentSegment.all();
+                    } else {
+                        currentIterator = currentSegment.reverseAll();

Review comment:
       This is not directly related to this PR, but it makes me wondering: why 
do not keep a separate `range / all` in extended `Segment` interface? Should we 
just remove that? Now we've added the reverse ones but only in the parent 
interface, it makes me feeling the original ones on `Segment` is not necessary.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
##########
@@ -117,12 +118,20 @@ public void openExisting(final ProcessorContext context, 
final long streamTime)
     }
 
     @Override
-    public List<S> segments(final long timeFrom, final long timeTo) {
+    public List<S> segments(final long timeFrom, final long timeTo, final 
boolean forward) {
         final List<S> result = new ArrayList<>();
-        final NavigableMap<Long, S> segmentsInRange = segments.subMap(
-            segmentId(timeFrom), true,
-            segmentId(timeTo), true
-        );
+        final NavigableMap<Long, S> segmentsInRange;
+        if (forward) {

Review comment:
       nit: we can just call `subMap` out of the condition and only call 
`descendingMap()` based on the condition.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
##########
@@ -192,4 +192,3 @@ public void close() {
         storeIterator.close();
     }
 }
-

Review comment:
       Is this intentional? We usually have a newline at file end in case some 
specific IDEs do not like otherwise.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -271,27 +345,68 @@ public synchronized void put(final Bytes key,
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator =
             new FilteredCacheIterator(cacheIterator, hasNextCondition, 
cacheFunction);
         return new MergedSortedCacheWindowStoreKeyValueIterator(
-                filteredCacheIterator,
-                underlyingIterator,
-                bytesSerdes,
-                windowSize,
-                cacheFunction
+            filteredCacheIterator,
+            underlyingIterator,
+            bytesSerdes,
+            windowSize,
+            cacheFunction,
+            true
+        );
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final 
long timeFrom,

Review comment:
       See my other comments: I think we do not need to add overloads for 
primitive types for the newly added APIs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to