mjsax commented on a change in pull request #11235: URL: https://github.com/apache/kafka/pull/11235#discussion_r692305627
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java ########## @@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) { } /** - * {@inheritdoc} + * {@inheritDoc} * * This method is optimized for {@link RocksDBTimeOrderedWindowStore#all()} only. Key and time * range queries are not supported. */ @Override public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) { - if (binaryKeyFrom != null || binaryKeyTo != null) { - throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys cannot be non-null. Key and time range queries are not supported."); + if (binaryKeyFrom == null && binaryKeyTo == null && from == 0 && to == Long.MAX_VALUE) { + return Iterator::hasNext; } - if (from != 0 && to != Long.MAX_VALUE) { - throw new IllegalArgumentException("from/to time ranges should be 0 to Long.MAX_VALUE. Key and time range queries are not supported."); + if (binaryKeyFrom != null && binaryKeyFrom.equals(binaryKeyTo) && from == to) { + + return iterator -> { + while (iterator.hasNext()) { + final Bytes bytes = iterator.peekNextKey(); + final Bytes keyBytes = Bytes + .wrap(TimeOrderedKeySchema.extractStoreKeyBytes(bytes.get())); + final long time = TimeOrderedKeySchema.extractStoreTimestamp(bytes.get()); + if (keyBytes.compareTo(binaryKeyFrom) >= 0 + && keyBytes.compareTo(binaryKeyTo) <= 0 + && time >= from + && time <= to) { + return true; + } + iterator.next(); + } + return false; + }; } - return iterator -> iterator.hasNext(); + throw new IllegalArgumentException("Key and time range queries are not supported."); } @Override public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments, final long from, final long to, final boolean forward) { - throw new UnsupportedOperationException(); + if (from != to) { + throw new IllegalArgumentException(""); Review comment: Ups. This one slipped... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org