mjsax commented on a change in pull request #11235:
URL: https://github.com/apache/kafka/pull/11235#discussion_r692305627



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) {
     }
 
     /**
-     * {@inheritdoc}
+     * {@inheritDoc}
      *
      * This method is optimized for {@link 
RocksDBTimeOrderedWindowStore#all()} only. Key and time
      * range queries are not supported.
      */
     @Override
     public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {
-        if (binaryKeyFrom != null || binaryKeyTo != null) {
-            throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys 
cannot be non-null. Key and time range queries are not supported.");
+        if (binaryKeyFrom == null && binaryKeyTo == null && from == 0 && to == 
Long.MAX_VALUE) {
+            return Iterator::hasNext;
         }
 
-        if (from != 0 && to != Long.MAX_VALUE) {
-            throw new IllegalArgumentException("from/to time ranges should be 
0 to Long.MAX_VALUE. Key and time range queries are not supported.");
+        if (binaryKeyFrom != null && binaryKeyFrom.equals(binaryKeyTo) && from 
== to) {
+
+            return iterator -> {
+                while (iterator.hasNext()) {
+                    final Bytes bytes = iterator.peekNextKey();
+                    final Bytes keyBytes = Bytes
+                        
.wrap(TimeOrderedKeySchema.extractStoreKeyBytes(bytes.get()));
+                    final long time = 
TimeOrderedKeySchema.extractStoreTimestamp(bytes.get());
+                    if (keyBytes.compareTo(binaryKeyFrom) >= 0
+                        && keyBytes.compareTo(binaryKeyTo) <= 0
+                        && time >= from
+                        && time <= to) {
+                        return true;
+                    }
+                    iterator.next();
+                }
+                return false;
+            };
         }
 
-        return iterator -> iterator.hasNext();
+        throw new IllegalArgumentException("Key and time range queries are not 
supported.");
     }
 
     @Override
     public <S extends Segment> List<S> segmentsToSearch(final Segments<S> 
segments, final long from, final long to, final boolean forward) {
-        throw new UnsupportedOperationException();
+        if (from != to) {
+            throw new IllegalArgumentException("");

Review comment:
       Ups. This one slipped...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to